Slurm Executor#
This executor plugin interfaces Covalent with HPC systems managed by Slurm. For workflows to be deployable, users must have SSH access to the Slurm login node, writable storage space on the remote filesystem, and permissions to submit jobs to Slurm.
Installation#
To use this plugin with Covalent, simply install it using pip
:
pip install covalent-slurm-plugin
On the remote system, the Python version in the environment you plan to use must match that used when dispatching the calculations. Additionally, the remote system’s Python environment must have the base covalent package installed (e.g. pip install covalent
).
Usage#
Basics#
The following shows an example of a Covalent configuration that is modified to support Slurm:
[executors.slurm]
username = "user"
address = "login.cluster.org"
ssh_key_file = "/home/user/.ssh/id_rsa"
remote_workdir = "/scratch/user"
cache_dir = "/tmp/covalent"
[executors.slurm.options]
nodes = 1
ntasks = 4
cpus-per-task = 8
constraint = "gpu"
gpus = 4
qos = "regular"
[executors.slurm.srun_options]
cpu_bind = "cores"
gpus = 4
gpu-bind = "single:1"
The first stanza describes default connection parameters for a user who can connect to the Slurm login node using, for example:
ssh -i /home/user/.ssh/id_rsa user@login.cluster.org
The second and third stanzas describe default parameters for #SBATCH
directives and default parameters passed directly to srun
, respectively.
This example generates a script containing the following preamble:
#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --cpus-per-task=8
#SBATCH --constraint=gpu
#SBATCH --gpus=4
#SBATCH --qos=regular
and subsequent workflow submission with:
srun --cpu_bind=cores --gpus=4 --gpu-bind=single:1
To use the configuration settings, an electron’s executor must be specified with a string argument, in this case:
import covalent as ct
@ct.electron(executor="slurm")
def my_task(x, y):
return x + y
Pre- and Postrun Commands#
Alternatively, passing a SlurmExecutor
instance enables custom behavior scoped to specific tasks. Here, the executor’s prerun_commands
and postrun_commands
parameters can be used to list shell commands to be executed before and after submitting the workflow. These may include any additional srun
commands apart from workflow submission. Commands can also be nested inside the submission call to srun
by using the srun_append
parameter.
More complex jobs can be crafted by using these optional parameters. For example, the instance below runs a job that accesses CPU and GPU resources on a single node, while profiling GPU usage via nsys
and issuing complementary commands that pause/resume the central hardware counter.
executor = ct.executor.SlurmExecutor(
remote_workdir="/scratch/user/experiment1",
options={
"qos": "regular",
"time": "01:30:00",
"nodes": 1,
"constraint": "gpu",
},
prerun_commands=[
"module load package/1.2.3",
"srun --ntasks-per-node 1 dcgmi profile --pause"
],
srun_options={
"n": 4,
"c": 8,
"cpu-bind": "cores",
"G": 4,
"gpu-bind": "single:1"
},
srun_append="nsys profile --stats=true -t cuda --gpu-metrics-device=all",
postrun_commands=[
"srun --ntasks-per-node 1 dcgmi profile --resume",
]
)
@ct.electron(executor=executor)
def my_custom_task(x, y):
return x + y
Here the corresponding submit script contains the following commands:
module load package/1.2.3
srun --ntasks-per-node 1 dcgmi profile --pause
srun -n 4 -c 8 --cpu-bind=cores -G 4 --gpu-bind=single:1 \
nsys profile --stats=true -t cuda --gpu-metrics-device=all \
python /scratch/user/experiment1/workflow_script.py
srun --ntasks-per-node 1 dcgmi profile --resume
sshproxy#
Some users may need two-factor authentication (2FA) to connect to a cluster. This plugin supports one form of 2FA using the sshproxy service developed by NERSC. When this plugin is configured to support sshproxy
, the user’s SSH key and certificate will be refreshed automatically by Covalent if either it does not exist or it is expired. We assume that the user has already configured 2FA, used the sshproxy
service on the command line without issue, and added the executable to their PATH
. Note that this plugin assumes the script is called sshproxy
, not sshproxy.sh
. Further note that using sshproxy
within Covalent is not required; a user can still run it manually and provide ssh_key_file
and cert_file
in the plugin constructor.
In order to enable sshproxy
in this plugin, add the following block to your Covalent configuration while the server is stopped:
[executors.slurm.sshproxy]
hosts = [ "perlmutter-p1.nersc.gov" ]
password = "<password>"
secret = "<mfa_secret>"
For details on how to modify your Covalent configuration, refer to the documentation here.
Then, reinstall this plugin using pip install covalent-slurm-plugin[sshproxy]
in order to pull in the oathtool
package which will generate one-time passwords.
The hosts
parameter is a list of hostnames for which the sshproxy
service will be used. If the address provided in the plugin constructor is not present in this list, sshproxy
will not be used. The password
is the user’s password, not including the 6-digit OTP. The secret
is the 2FA secret provided when a user registers a new device on Iris. Rather than scan the QR code into an authenticator app, inspect the Oath Seed URL for a string labeled secret=...
, typically consisting of numbers and capital letters. Users can validate that correct OTP codes are being generated by using the command oathtool <secret>
and using the 6-digit number returned in the “Test” option on the Iris 2FA page. Note that these values are stored in plaintext in the Covalent configuration file. If a user suspects credentials have been stolen or compromised, contact your systems administrator immediately to report the incident and request deactivation.
- class covalent_slurm_plugin.SlurmExecutor(username=None, address=None, ssh_key_file=None, cert_file=None, remote_workdir=None, slurm_path=None, conda_env=None, cache_dir=None, options=None, sshproxy=None, srun_options=None, srun_append=None, prerun_commands=None, postrun_commands=None, poll_freq=None, cleanup=None, **kwargs)[source]#
Slurm executor plugin class.
- Parameters
username (
Optional
[str
]) – Username used to authenticate over SSH.address (
Optional
[str
]) – Remote address or hostname of the Slurm login node.ssh_key_file (
Optional
[str
]) – Private RSA key used to authenticate over SSH.cert_file (
Optional
[str
]) – Certificate file used to authenticate over SSH, if required.remote_workdir (
Optional
[str
]) – Working directory on the remote cluster.slurm_path (
Optional
[str
]) – Path to the slurm commands if they are not found automatically.conda_env (
Optional
[str
]) – Name of conda environment on which to run the function.cache_dir (
Optional
[str
]) – Cache directory used by this executor for temporary files.options (
Optional
[Dict
]) – Dictionary of parameters used to build a Slurm submit script.srun_options (
Optional
[Dict
]) – Dictionary of parameters passed to srun inside submit script.srun_append (
Optional
[str
]) – Command nested into srun call.prerun_commands (
Optional
[List
[str
]]) – List of shell commands to run before submitting with srun.postrun_commands (
Optional
[List
[str
]]) – List of shell commands to run after submitting with srun.poll_freq (
Optional
[int
]) – Frequency with which to poll a submitted job.cleanup (
Optional
[bool
]) – Whether to perform cleanup or not on remote machine.
Methods:
cancel
(task_metadata, job_handle)Method to cancel the job identified uniquely by the job_handle (base class)
from_dict
(object_dict)Rehydrate a dictionary representation
Get if the task was requested to be canceled
get_dispatch_context
(dispatch_info)Start a context manager that will be used to access the dispatch info for the executor.
get_status
(info_dict, conn)Query the status of a job previously submitted to Slurm.
Query the database for dispatch version metadata.
perform_cleanup
(conn, remote_func_filename, …)Function to perform cleanup on remote machine
poll
(task_group_metadata, data)Block until the job has reached a terminal state.
receive
(task_group_metadata, data)Return a list of task updates.
run
(function, args, kwargs, task_metadata)Abstract method to run a function in the executor in async-aware manner.
send
(task_specs, resources, task_group_metadata)Submit a list of task references to the compute backend.
set_job_handle
(handle)Save the job handle to database
set_job_status
(status)Validates and sets the job state
setup
(task_metadata)Executor specific setup method
teardown
(task_metadata)Executor specific teardown method
to_dict
()Return a JSON-serializable dictionary representation of self
validate_status
(status)Overridable filter
write_streams_to_file
(stream_strings, …)Write the contents of stdout and stderr to respective files.
- async cancel(task_metadata, job_handle)#
Method to cancel the job identified uniquely by the job_handle (base class)
- Arg(s)
task_metadata: Metadata of the task to be cancelled job_handle: Unique ID of the job assigned by the backend
- Return(s)
False by default
- Return type
bool
- from_dict(object_dict)#
Rehydrate a dictionary representation
- Parameters
object_dict (
dict
) – a dictionary representation returned by to_dict- Return type
- Returns
self
Instance attributes will be overwritten.
- async get_cancel_requested()#
Get if the task was requested to be canceled
- Arg(s)
None
- Return(s)
Whether the task has been requested to be cancelled
- Return type
Any
- get_dispatch_context(dispatch_info)#
Start a context manager that will be used to access the dispatch info for the executor.
- Parameters
dispatch_info (
DispatchInfo
) – The dispatch info to be used inside current context.- Return type
AbstractContextManager
[DispatchInfo
]- Returns
A context manager object that handles the dispatch info.
- async get_status(info_dict, conn)[source]#
Query the status of a job previously submitted to Slurm.
- Parameters
info_dict (
dict
) –a dictionary containing all necessary parameters needed to query the status of the execution. Required keys in the dictionary are:
A string mapping “job_id” to Slurm job ID.
- Returns
String describing the job status.
- Return type
status
- async get_version_info()#
Query the database for dispatch version metadata.
- Arg:
dispatch_id: Dispatch ID of the lattice
- Returns
python_version, “covalent”: covalent_version}
- Return type
{“python”
- async perform_cleanup(conn, remote_func_filename, remote_slurm_filename, remote_py_filename, remote_result_filename, remote_stdout_filename, remote_stderr_filename)[source]#
Function to perform cleanup on remote machine
- Parameters
remote_func_filename (
str
) – Function file on remote machineremote_slurm_filename (
str
) – Slurm script file on remote machineremote_py_filename (
str
) – Python script file on remote machineremote_result_filename (
str
) – Result file on remote machineremote_stdout_filename (
str
) – Standard out file on remote machineremote_stderr_filename (
str
) – Standard error file on remote machine
- Return type
None
- Returns
None
- async poll(task_group_metadata, data)#
Block until the job has reached a terminal state.
- Parameters
task_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any
) – The return value of send().
The return value of poll() will be passed directly into receive().
Raise NotImplementedError to indicate that the compute backend will notify the Covalent server asynchronously of job completion.
- Return type
Any
- async receive(task_group_metadata, data)#
Return a list of task updates.
Each task must have reached a terminal state by the time this is invoked.
- Parameters
task_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any
) – The return value of poll() or the request body of /jobs/update.
- Return type
List
[TaskUpdate
]- Returns
Returns a list of task results, each a TaskUpdate dataclass of the form
- {
“dispatch_id”: dispatch_id, “node_id”: node_id, “status”: status, “assets”: {
- ”output”: {
“remote_uri”: output_uri,
}, “stdout”: {
”remote_uri”: stdout_uri,
}, “stderr”: {
”remote_uri”: stderr_uri,
},
},
}
corresponding to the node ids (task_ids) specified in the task_group_metadata. This might be a subset of the node ids in the originally submitted task group as jobs may notify Covalent asynchronously of completed tasks before the entire task group finishes running.
- async run(function, args, kwargs, task_metadata)[source]#
Abstract method to run a function in the executor in async-aware manner.
- Parameters
function (
Callable
) – The function to run in the executorargs (
List
) – List of positional arguments to be used by the functionkwargs (
Dict
) – Dictionary of keyword arguments to be used by the function.task_metadata (
Dict
) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id
- Returns
The result of the function execution
- Return type
output
- async send(task_specs, resources, task_group_metadata)#
Submit a list of task references to the compute backend.
- Parameters
task_specs (
List
[TaskSpec
]) – a list of TaskSpecsresources (
ResourceMap
) – a ResourceMap mapping task assets to URIstask_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.
The return value of send() will be passed directly into poll().
- Return type
Any
- async set_job_handle(handle)#
Save the job handle to database
- Arg(s)
handle: JSONable type identifying the job being executed by the backend
- Return(s)
Response from the listener that handles inserting the job handle to database
- Return type
Any
- async set_job_status(status)#
Validates and sets the job state
For use with send/receive API
- Return(s)
Whether the action succeeded
- Return type
bool
- async setup(task_metadata)#
Executor specific setup method
- to_dict()#
Return a JSON-serializable dictionary representation of self
- Return type
dict
- validate_status(status)#
Overridable filter
- Return type
bool
- async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#
Write the contents of stdout and stderr to respective files.
- Parameters
stream_strings (
Iterable
[str
]) – The stream_strings to be written to files.filepaths (
Iterable
[str
]) – The filepaths to be used for writing the streams.dispatch_id (
str
) – The ID of the dispatch which initiated the request.results_dir (
str
) – The location of the results directory.
This uses aiofiles to avoid blocking the event loop.
- Return type
None