Slurm Executor#

This executor plugin interfaces Covalent with HPC systems managed by Slurm. For workflows to be deployable, users must have SSH access to the Slurm login node, writable storage space on the remote filesystem, and permissions to submit jobs to Slurm.

Installation#

To use this plugin with Covalent, simply install it using pip:

pip install covalent-slurm-plugin

On the remote system, the Python version in the environment you plan to use must match that used when dispatching the calculations. Additionally, the remote system’s Python environment must have the base covalent package installed (e.g. pip install covalent).

Usage#

Basics#

The following shows an example of a Covalent configuration that is modified to support Slurm:

[executors.slurm]
username = "user"
address = "login.cluster.org"
ssh_key_file = "/home/user/.ssh/id_rsa"
remote_workdir = "/scratch/user"
cache_dir = "/tmp/covalent"

[executors.slurm.options]
nodes = 1
ntasks = 4
cpus-per-task = 8
constraint = "gpu"
gpus = 4
qos = "regular"

[executors.slurm.srun_options]
cpu_bind = "cores"
gpus = 4
gpu-bind = "single:1"

The first stanza describes default connection parameters for a user who can connect to the Slurm login node using, for example:

ssh -i /home/user/.ssh/id_rsa user@login.cluster.org

The second and third stanzas describe default parameters for #SBATCH directives and default parameters passed directly to srun, respectively.

This example generates a script containing the following preamble:

#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --cpus-per-task=8
#SBATCH --constraint=gpu
#SBATCH --gpus=4
#SBATCH --qos=regular

and subsequent workflow submission with:

srun --cpu_bind=cores --gpus=4 --gpu-bind=single:1

To use the configuration settings, an electron’s executor must be specified with a string argument, in this case:

import covalent as ct

@ct.electron(executor="slurm")
def my_task(x, y):
    return x + y

Pre- and Postrun Commands#

Alternatively, passing a SlurmExecutor instance enables custom behavior scoped to specific tasks. Here, the executor’s prerun_commands and postrun_commands parameters can be used to list shell commands to be executed before and after submitting the workflow. These may include any additional srun commands apart from workflow submission. Commands can also be nested inside the submission call to srun by using the srun_append parameter.

More complex jobs can be crafted by using these optional parameters. For example, the instance below runs a job that accesses CPU and GPU resources on a single node, while profiling GPU usage via nsys and issuing complementary commands that pause/resume the central hardware counter.

executor = ct.executor.SlurmExecutor(
    remote_workdir="/scratch/user/experiment1",
    options={
        "qos": "regular",
        "time": "01:30:00",
        "nodes": 1,
        "constraint": "gpu",
    },
    prerun_commands=[
        "module load package/1.2.3",
        "srun --ntasks-per-node 1 dcgmi profile --pause"
    ],
    srun_options={
        "n": 4,
        "c": 8,
        "cpu-bind": "cores",
        "G": 4,
        "gpu-bind": "single:1"
    },
    srun_append="nsys profile --stats=true -t cuda --gpu-metrics-device=all",
    postrun_commands=[
        "srun --ntasks-per-node 1 dcgmi profile --resume",
    ]
)

@ct.electron(executor=executor)
def my_custom_task(x, y):
    return x + y

Here the corresponding submit script contains the following commands:

module load package/1.2.3
srun --ntasks-per-node 1 dcgmi profile --pause

srun -n 4 -c 8 --cpu-bind=cores -G 4 --gpu-bind=single:1 \
nsys profile --stats=true -t cuda --gpu-metrics-device=all \
python /scratch/user/experiment1/workflow_script.py

srun --ntasks-per-node 1 dcgmi profile --resume

sshproxy#

Some users may need two-factor authentication (2FA) to connect to a cluster. This plugin supports one form of 2FA using the sshproxy service developed by NERSC. When this plugin is configured to support sshproxy, the user’s SSH key and certificate will be refreshed automatically by Covalent if either it does not exist or it is expired. We assume that the user has already configured 2FA, used the sshproxy service on the command line without issue, and added the executable to their PATH. Note that this plugin assumes the script is called sshproxy, not sshproxy.sh. Further note that using sshproxy within Covalent is not required; a user can still run it manually and provide ssh_key_file and cert_file in the plugin constructor.

In order to enable sshproxy in this plugin, add the following block to your Covalent configuration while the server is stopped:

[executors.slurm.sshproxy]
hosts = [ "perlmutter-p1.nersc.gov" ]
password = "<password>"
secret = "<mfa_secret>"

For details on how to modify your Covalent configuration, refer to the documentation here.

Then, reinstall this plugin using pip install covalent-slurm-plugin[sshproxy] in order to pull in the oathtool package which will generate one-time passwords.

The hosts parameter is a list of hostnames for which the sshproxy service will be used. If the address provided in the plugin constructor is not present in this list, sshproxy will not be used. The password is the user’s password, not including the 6-digit OTP. The secret is the 2FA secret provided when a user registers a new device on Iris. Rather than scan the QR code into an authenticator app, inspect the Oath Seed URL for a string labeled secret=..., typically consisting of numbers and capital letters. Users can validate that correct OTP codes are being generated by using the command oathtool <secret> and using the 6-digit number returned in the “Test” option on the Iris 2FA page. Note that these values are stored in plaintext in the Covalent configuration file. If a user suspects credentials have been stolen or compromised, contact your systems administrator immediately to report the incident and request deactivation.

class covalent_slurm_plugin.SlurmExecutor(username=None, address=None, ssh_key_file=None, cert_file=None, remote_workdir=None, slurm_path=None, conda_env=None, cache_dir=None, options=None, sshproxy=None, srun_options=None, srun_append=None, prerun_commands=None, postrun_commands=None, poll_freq=None, cleanup=None, **kwargs)[source]#

Slurm executor plugin class.

Parameters
  • username (Optional[str]) – Username used to authenticate over SSH.

  • address (Optional[str]) – Remote address or hostname of the Slurm login node.

  • ssh_key_file (Optional[str]) – Private RSA key used to authenticate over SSH.

  • cert_file (Optional[str]) – Certificate file used to authenticate over SSH, if required.

  • remote_workdir (Optional[str]) – Working directory on the remote cluster.

  • slurm_path (Optional[str]) – Path to the slurm commands if they are not found automatically.

  • conda_env (Optional[str]) – Name of conda environment on which to run the function.

  • cache_dir (Optional[str]) – Cache directory used by this executor for temporary files.

  • options (Optional[Dict]) – Dictionary of parameters used to build a Slurm submit script.

  • srun_options (Optional[Dict]) – Dictionary of parameters passed to srun inside submit script.

  • srun_append (Optional[str]) – Command nested into srun call.

  • prerun_commands (Optional[List[str]]) – List of shell commands to run before submitting with srun.

  • postrun_commands (Optional[List[str]]) – List of shell commands to run after submitting with srun.

  • poll_freq (Optional[int]) – Frequency with which to poll a submitted job.

  • cleanup (Optional[bool]) – Whether to perform cleanup or not on remote machine.

Methods:

cancel(task_metadata, job_handle)

Method to cancel the job identified uniquely by the job_handle (base class)

from_dict(object_dict)

Rehydrate a dictionary representation

get_cancel_requested()

Get if the task was requested to be canceled

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

get_status(info_dict, conn)

Query the status of a job previously submitted to Slurm.

get_version_info()

Query the database for dispatch version metadata.

perform_cleanup(conn, remote_func_filename, …)

Function to perform cleanup on remote machine

poll(task_group_metadata, data)

Block until the job has reached a terminal state.

receive(task_group_metadata, data)

Return a list of task updates.

run(function, args, kwargs, task_metadata)

Abstract method to run a function in the executor in async-aware manner.

send(task_specs, resources, task_group_metadata)

Submit a list of task references to the compute backend.

set_job_handle(handle)

Save the job handle to database

set_job_status(status)

Validates and sets the job state

setup(task_metadata)

Executor specific setup method

teardown(task_metadata)

Executor specific teardown method

to_dict()

Return a JSON-serializable dictionary representation of self

validate_status(status)

Overridable filter

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

async cancel(task_metadata, job_handle)#

Method to cancel the job identified uniquely by the job_handle (base class)

Arg(s)

task_metadata: Metadata of the task to be cancelled job_handle: Unique ID of the job assigned by the backend

Return(s)

False by default

Return type

bool

from_dict(object_dict)#

Rehydrate a dictionary representation

Parameters

object_dict (dict) – a dictionary representation returned by to_dict

Return type

BaseExecutor

Returns

self

Instance attributes will be overwritten.

async get_cancel_requested()#

Get if the task was requested to be canceled

Arg(s)

None

Return(s)

Whether the task has been requested to be cancelled

Return type

Any

get_dispatch_context(dispatch_info)#

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside current context.

Return type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

async get_status(info_dict, conn)[source]#

Query the status of a job previously submitted to Slurm.

Parameters

info_dict (dict) –

a dictionary containing all necessary parameters needed to query the status of the execution. Required keys in the dictionary are:

A string mapping “job_id” to Slurm job ID.

Returns

String describing the job status.

Return type

status

async get_version_info()#

Query the database for dispatch version metadata.

Arg:

dispatch_id: Dispatch ID of the lattice

Returns

python_version, “covalent”: covalent_version}

Return type

{“python”

async perform_cleanup(conn, remote_func_filename, remote_slurm_filename, remote_py_filename, remote_result_filename, remote_stdout_filename, remote_stderr_filename)[source]#

Function to perform cleanup on remote machine

Parameters
  • remote_func_filename (str) – Function file on remote machine

  • remote_slurm_filename (str) – Slurm script file on remote machine

  • remote_py_filename (str) – Python script file on remote machine

  • remote_result_filename (str) – Result file on remote machine

  • remote_stdout_filename (str) – Standard out file on remote machine

  • remote_stderr_filename (str) – Standard error file on remote machine

Return type

None

Returns

None

async poll(task_group_metadata, data)#

Block until the job has reached a terminal state.

Parameters
  • task_group_metadata (Dict) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.

  • data (Any) – The return value of send().

The return value of poll() will be passed directly into receive().

Raise NotImplementedError to indicate that the compute backend will notify the Covalent server asynchronously of job completion.

Return type

Any

async receive(task_group_metadata, data)#

Return a list of task updates.

Each task must have reached a terminal state by the time this is invoked.

Parameters
  • task_group_metadata (Dict) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.

  • data (Any) – The return value of poll() or the request body of /jobs/update.

Return type

List[TaskUpdate]

Returns

Returns a list of task results, each a TaskUpdate dataclass of the form

{

“dispatch_id”: dispatch_id, “node_id”: node_id, “status”: status, “assets”: {

”output”: {

“remote_uri”: output_uri,

}, “stdout”: {

”remote_uri”: stdout_uri,

}, “stderr”: {

”remote_uri”: stderr_uri,

},

},

}

corresponding to the node ids (task_ids) specified in the task_group_metadata. This might be a subset of the node ids in the originally submitted task group as jobs may notify Covalent asynchronously of completed tasks before the entire task group finishes running.

async run(function, args, kwargs, task_metadata)[source]#

Abstract method to run a function in the executor in async-aware manner.

Parameters
  • function (Callable) – The function to run in the executor

  • args (List) – List of positional arguments to be used by the function

  • kwargs (Dict) – Dictionary of keyword arguments to be used by the function.

  • task_metadata (Dict) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id

Returns

The result of the function execution

Return type

output

async send(task_specs, resources, task_group_metadata)#

Submit a list of task references to the compute backend.

Parameters
  • task_specs (List[TaskSpec]) – a list of TaskSpecs

  • resources (ResourceMap) – a ResourceMap mapping task assets to URIs

  • task_group_metadata (Dict) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.

The return value of send() will be passed directly into poll().

Return type

Any

async set_job_handle(handle)#

Save the job handle to database

Arg(s)

handle: JSONable type identifying the job being executed by the backend

Return(s)

Response from the listener that handles inserting the job handle to database

Return type

Any

async set_job_status(status)#

Validates and sets the job state

For use with send/receive API

Return(s)

Whether the action succeeded

Return type

bool

async setup(task_metadata)#

Executor specific setup method

async teardown(task_metadata)[source]#

Executor specific teardown method

to_dict()#

Return a JSON-serializable dictionary representation of self

Return type

dict

validate_status(status)#

Overridable filter

Return type

bool

async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#

Write the contents of stdout and stderr to respective files.

Parameters
  • stream_strings (Iterable[str]) – The stream_strings to be written to files.

  • filepaths (Iterable[str]) – The filepaths to be used for writing the streams.

  • dispatch_id (str) – The ID of the dispatch which initiated the request.

  • results_dir (str) – The location of the results directory.

This uses aiofiles to avoid blocking the event loop.

Return type

None