AWS Batch Executor#

Covalent is a Pythonic workflow tool used to execute tasks on advanced computing hardware.
This executor plugin interfaces Covalent with AWS Batch which allows tasks in a covalent workflow to be executed as AWS batch jobs.
Furthermore, this plugin is well suited for compute/memory intensive tasks such as training machine learning models, hyperparameter optimization, deep learning etc. With this executor, the compute backend is the Amazon EC2 service, with instances optimized for compute and memory intensive operations.
1. Installation#
To use this plugin with Covalent, simply install it using pip
:
pip install covalent-awsbatch-plugin
2. Usage Example#
This is an example of how a workflow can be adapted to utilize the AWS Batch Executor. Here we train a simple Support Vector Machine (SVM) model and use an existing AWS Batch Compute environment to run the train_svm
electron as a batch job. We also note we require DepsPip to install the dependencies when creating the batch job.
from numpy.random import permutation
from sklearn import svm, datasets
import covalent as ct
deps_pip = ct.DepsPip(
packages=["numpy==1.23.2", "scikit-learn==1.1.2"]
)
executor = ct.executor.AWSBatchExecutor(
s3_bucket_name = "covalent-batch-qa-job-resources",
batch_job_definition_name = "covalent-batch-qa-job-definition",
batch_queue = "covalent-batch-qa-queue",
batch_execution_role_name = "ecsTaskExecutionRole",
batch_job_role_name = "covalent-batch-qa-job-role",
batch_job_log_group_name = "covalent-batch-qa-log-group",
vcpu = 2, # Number of vCPUs to allocate
memory = 3.75, # Memory in GB to allocate
time_limit = 300, # Time limit of job in seconds
)
# Use executor plugin to train our SVM model.
@ct.electron(
executor=executor,
deps_pip=deps_pip
)
def train_svm(data, C, gamma):
X, y = data
clf = svm.SVC(C=C, gamma=gamma)
clf.fit(X[90:], y[90:])
return clf
@ct.electron
def load_data():
iris = datasets.load_iris()
perm = permutation(iris.target.size)
iris.data = iris.data[perm]
iris.target = iris.target[perm]
return iris.data, iris.target
@ct.electron
def score_svm(data, clf):
X_test, y_test = data
return clf.score(
X_test[:90],
y_test[:90]
)
@ct.lattice
def run_experiment(C=1.0, gamma=0.7):
data = load_data()
clf = train_svm(
data=data,
C=C,
gamma=gamma
)
score = score_svm(
data=data,
clf=clf
)
return score
# Dispatch the workflow
dispatch_id = ct.dispatch(run_experiment)(
C=1.0,
gamma=0.7
)
# Wait for our result and get result value
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result
print(result)
During the execution of the workflow one can navigate to the UI to see the status of the workflow, once completed however the above script should also output a value with the score of our model.
0.8666666666666667
3. Overview of Configuration#
Config Key |
Is Required |
Default |
Description |
---|---|---|---|
profile |
No |
default |
Named AWS profile used for authentication |
region |
Yes |
us-east-1 |
AWS Region to use to for client calls |
credentials |
No |
~/.aws/credentials |
The path to the AWS credentials file |
batch_queue |
Yes |
covalent-batch-queue |
Name of the Batch queue used for job management. |
s3_bucket_name |
Yes |
covalent-batch-job-resources |
Name of an S3 bucket where covalent artifacts are stored. |
batch_job_definition_name |
Yes |
covalent-batch-jobs |
Name of the Batch job definition for a user, project, or experiment. |
batch_execution_role_name |
No |
ecsTaskExecutionRole |
Name of the IAM role used by the Batch ECS agent (the above role should already exist in AWS). |
batch_job_role_name |
Yes |
CovalentBatchJobRole |
Name of the IAM role used within the container. |
batch_job_log_group_name |
Yes |
covalent-batch-job-logs |
Name of the CloudWatch log group where container logs are stored. |
vcpu |
No |
2 |
Number of vCPUs available to a task. |
memory |
No |
3.75 |
Memory (in GB) available to a task. |
num_gpus |
No |
0 |
Number of GPUs availabel to a task. |
retry_attempts |
No |
3 |
Number of times a job is retried if it fails. |
time_limit |
No |
300 |
Time limit (in seconds) after which jobs are killed. |
poll_freq |
No |
10 |
Frequency (in seconds) with which to poll a submitted task. |
cache_dir |
No |
/tmp/covalent |
Cache directory used by this executor for temporary files. |
This plugin can be configured in one of two ways:
Configuration options can be passed in as constructor keys to the executor class
ct.executor.AWSBatchExecutor
By modifying the covalent configuration file under the section
[executors.awsbatch]
The following shows an example of how a user might modify their covalent configuration file to support this plugin:
[executors.awsbatch]
s3_bucket_name = "covalent-batch-job-resources"
batch_queue = "covalent-batch-queue"
batch_job_definition_name = "covalent-batch-jobs"
batch_execution_role_name = "ecsTaskExecutionRole"
batch_job_role_name = "CovalentBatchJobRole"
batch_job_log_group_name = "covalent-batch-job-logs"
...
4. Required Cloud Resources#
In order to run your workflows with covalent there are a few notable AWS resources that need to be provisioned first.
Resource |
Is Required |
Config Key |
Description |
---|---|---|---|
AWS S3 Bucket |
Yes |
|
S3 bucket must be created for covalent to store essential files that are needed during execution. |
VPC & Subnet |
Yes |
N/A |
A VPC must be associated with the AWS Batch Compute Environment along with a public or private subnet (there needs to be additional resources created for private subnets) |
AWS Batch Compute Environment |
Yes |
N/A |
An AWS Batch compute environment (EC2) that will provision EC2 instances as needed when jobs are submitted to the associated job queue. |
AWS Batch Queue |
Yes |
|
An AWS Batch Job Queue that will queue tasks for execution in it’s associated compute environment. |
AWS Batch Job Definition |
Yes |
|
An AWS Batch job definition that will be replaced by a new batch job definition when the workflow is executed. |
AWS IAM Role (Job Role) |
Yes |
|
The IAM role used within the container. |
AWS IAM Role (Execution Role) |
No |
|
The IAM role used by the Batch ECS agent (default role ecsTaskExecutionRole should already exist). |
Log Group |
Yes |
|
An AWS CloudWatch log group where task logs are stored. |
To create an AWS S3 Bucket refer to the following AWS documentation.
To create a VPC & Subnet refer to the following AWS documentation.
To create an AWS Batch Queue refer to the following AWS documentation it must be a compute environment configured in EC2 mode.
To create an AWS Batch Job Definition refer to the following AWS documentation the configuration for this can be trivial as covalent will update the Job Definition prior to execution.
To create an AWS IAM Role for batch jobs (Job Role) one can provision a policy with the following permissions (below) then create a new role and attach with the created policy. Refer to the following AWS documentation for an example of creating a policy & role in IAM.
AWS Batch IAM Job Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BatchJobMgmt",
"Effect": "Allow",
"Action": [
"batch:TerminateJob",
"batch:DescribeJobs",
"batch:SubmitJob",
"batch:RegisterJobDefinition"
],
"Resource": "*"
},
{
"Sid": "ECRAuth",
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken"
],
"Resource": "*"
},
{
"Sid": "ECRUpload",
"Effect": "Allow",
"Action": [
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"ecr:BatchCheckLayerAvailability",
"ecr:InitiateLayerUpload",
"ecr:UploadLayerPart",
"ecr:CompleteLayerUpload",
"ecr:PutImage"
],
"Resource": [
"arn:aws:ecr:<region>:<account>:repository/<ecr_repo_name>"
]
},
{
"Sid": "IAMRoles",
"Effect": "Allow",
"Action": [
"iam:GetRole",
"iam:PassRole"
],
"Resource": [
"arn:aws:iam::<account>:role/CovalentBatchJobRole",
"arn:aws:iam::<account>:role/ecsTaskExecutionRole"
]
},
{
"Sid": "ObjectStore",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:PutObject",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::<s3_resource_bucket>/*",
"arn:aws:s3:::<s3_resource_bucket>"
]
},
{
"Sid": "LogRead",
"Effect": "Allow",
"Action": [
"logs:GetLogEvents"
],
"Resource": [
"arn:aws:logs:<region>:<account>:log-group:<cloudwatch_log_group_name>:log-stream:*"
]
}
]
}
- class covalent_awsbatch_plugin.awsbatch.AWSBatchExecutor(credentials=None, profile=None, region=None, s3_bucket_name=None, batch_queue=None, batch_execution_role_name=None, batch_job_role_name=None, batch_job_log_group_name=None, vcpu=None, memory=None, num_gpus=None, retry_attempts=None, time_limit=None, poll_freq=None, cache_dir=None)[source]#
AWS Batch executor plugin class.
- Parameters
credentials (
Optional
[str
]) – Full path to AWS credentials file.profile (
Optional
[str
]) – Name of an AWS profile whose credentials are used.s3_bucket_name (
Optional
[str
]) – Name of an S3 bucket where objects are stored.batch_queue (
Optional
[str
]) – Name of the Batch queue used for job management.batch_execution_role_name (
Optional
[str
]) – Name of the IAM role used by the Batch ECS agent.batch_job_role_name (
Optional
[str
]) – Name of the IAM role used within the container.batch_job_log_group_name (
Optional
[str
]) – Name of the CloudWatch log group where container logs are stored.vcpu (
Optional
[int
]) – Number of vCPUs available to a task.memory (
Optional
[float
]) – Memory (in GB) available to a task.num_gpus (
Optional
[int
]) – Number of GPUs available to a task.retry_attempts (
Optional
[int
]) – Number of times a job is retried if it fails.time_limit (
Optional
[int
]) – Time limit (in seconds) after which jobs are killed.poll_freq (
Optional
[int
]) – Frequency with which to poll a submitted task.cache_dir (
Optional
[str
]) – Cache directory used by this executor for temporary files.
Methods:
Returns a dictionary of kwargs to populate a new boto3.Session() instance with proper auth, region, and profile options.
cancel
(task_metadata, job_handle)Cancel the batch job.
from_dict
(object_dict)Rehydrate a dictionary representation
Get if the task was requested to be canceled
get_dispatch_context
(dispatch_info)Start a context manager that will be used to access the dispatch info for the executor.
get_status
(job_id)Query the status of a previously submitted Batch job.
Query the database for dispatch version metadata.
poll
(task_group_metadata, data)Block until the job has reached a terminal state.
query_result
(task_metadata)Query and retrieve a completed job’s result.
receive
(task_group_metadata, data)Return a list of task updates.
run
(function, args, kwargs, task_metadata)Abstract method to run a function in the executor in async-aware manner.
run_async_subprocess
(cmd)Invokes an async subprocess to run a command.
send
(task_specs, resources, task_group_metadata)Submit a list of task references to the compute backend.
set_job_handle
(handle)Save the job handle to database
set_job_status
(status)Validates and sets the job state
setup
(task_metadata)Executor specific setup method
submit_task
(task_metadata, identity)Invokes the task on the remote backend.
teardown
(task_metadata)Executor specific teardown method
to_dict
()Return a JSON-serializable dictionary representation of self
validate_status
(status)Overridable filter
write_streams_to_file
(stream_strings, …)Write the contents of stdout and stderr to respective files.
- boto_session_options()#
Returns a dictionary of kwargs to populate a new boto3.Session() instance with proper auth, region, and profile options.
- Return type
Dict
[str
,str
]
- async cancel(task_metadata, job_handle)[source]#
Cancel the batch job.
- Parameters
task_metadata (
Dict
) – Dictionary with the task’s dispatch_id and node id.job_handle (
str
) – Unique job handle assigned to the task by AWS Batch.
- Return type
bool
- Returns
If the job was cancelled or not
- from_dict(object_dict)#
Rehydrate a dictionary representation
- Parameters
object_dict (
dict
) – a dictionary representation returned by to_dict- Return type
- Returns
self
Instance attributes will be overwritten.
- async get_cancel_requested()#
Get if the task was requested to be canceled
- Arg(s)
None
- Return(s)
Whether the task has been requested to be cancelled
- Return type
Any
- get_dispatch_context(dispatch_info)#
Start a context manager that will be used to access the dispatch info for the executor.
- Parameters
dispatch_info (
DispatchInfo
) – The dispatch info to be used inside current context.- Return type
AbstractContextManager
[DispatchInfo
]- Returns
A context manager object that handles the dispatch info.
- async get_status(job_id)[source]#
Query the status of a previously submitted Batch job.
- Parameters
batch – Batch client object.
job_id (
str
) – Identifier used to identify a Batch job.
- Returns
String describing the task status. exit_code: Exit code, if the task has completed, else -1.
- Return type
status
- async get_version_info()#
Query the database for dispatch version metadata.
- Arg:
dispatch_id: Dispatch ID of the lattice
- Returns
python_version, “covalent”: covalent_version}
- Return type
{“python”
- async poll(task_group_metadata, data)#
Block until the job has reached a terminal state.
- Parameters
task_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any
) – The return value of send().
The return value of poll() will be passed directly into receive().
Raise NotImplementedError to indicate that the compute backend will notify the Covalent server asynchronously of job completion.
- Return type
Any
- async query_result(task_metadata)[source]#
Query and retrieve a completed job’s result.
- Parameters
task_metadata (
Dict
) – Dictionary containing the task dispatch_id and node_id- Return type
Tuple
[Any
,str
,str
]- Returns
result, stdout, stderr
- async receive(task_group_metadata, data)#
Return a list of task updates.
Each task must have reached a terminal state by the time this is invoked.
- Parameters
task_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any
) – The return value of poll() or the request body of /jobs/update.
- Return type
List
[TaskUpdate
]- Returns
Returns a list of task results, each a TaskUpdate dataclass of the form
- {
“dispatch_id”: dispatch_id, “node_id”: node_id, “status”: status, “assets”: {
- ”output”: {
“remote_uri”: output_uri,
}, “stdout”: {
”remote_uri”: stdout_uri,
}, “stderr”: {
”remote_uri”: stderr_uri,
},
},
}
corresponding to the node ids (task_ids) specified in the task_group_metadata. This might be a subset of the node ids in the originally submitted task group as jobs may notify Covalent asynchronously of completed tasks before the entire task group finishes running.
- async run(function, args, kwargs, task_metadata)[source]#
Abstract method to run a function in the executor in async-aware manner.
- Parameters
function (
Callable
) – The function to run in the executorargs (
List
) – List of positional arguments to be used by the functionkwargs (
Dict
) – Dictionary of keyword arguments to be used by the function.task_metadata (
Dict
) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id
- Returns
The result of the function execution
- Return type
output
- async static run_async_subprocess(cmd)#
Invokes an async subprocess to run a command.
- Return type
Tuple
- async send(task_specs, resources, task_group_metadata)#
Submit a list of task references to the compute backend.
- Parameters
task_specs (
List
[TaskSpec
]) – a list of TaskSpecsresources (
ResourceMap
) – a ResourceMap mapping task assets to URIstask_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.
The return value of send() will be passed directly into poll().
- Return type
Any
- async set_job_handle(handle)#
Save the job handle to database
- Arg(s)
handle: JSONable type identifying the job being executed by the backend
- Return(s)
Response from the listener that handles inserting the job handle to database
- Return type
Any
- async set_job_status(status)#
Validates and sets the job state
For use with send/receive API
- Return(s)
Whether the action succeeded
- Return type
bool
- async setup(task_metadata)#
Executor specific setup method
- async submit_task(task_metadata, identity)[source]#
Invokes the task on the remote backend.
- Parameters
task_metadata (
Dict
) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id.identity (
Dict
) – Dictionary from _validate_credentials call { “Account”: “AWS Account ID”, …}
- Returns
Task UUID defined on the remote backend.
- Return type
task_uuid
- async teardown(task_metadata)#
Executor specific teardown method
- to_dict()#
Return a JSON-serializable dictionary representation of self
- Return type
dict
- validate_status(status)#
Overridable filter
- Return type
bool
- async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#
Write the contents of stdout and stderr to respective files.
- Parameters
stream_strings (
Iterable
[str
]) – The stream_strings to be written to files.filepaths (
Iterable
[str
]) – The filepaths to be used for writing the streams.dispatch_id (
str
) – The ID of the dispatch which initiated the request.results_dir (
str
) – The location of the results directory.
This uses aiofiles to avoid blocking the event loop.
- Return type
None