AWS Braket Executor#

Covalent is a Pythonic workflow tool used to execute tasks on advanced computing hardware.
This plugin allows executing quantum circuits and quantum-classical hybrid jobs in Amazon Braket when you use Covalent.
1. Installation#
To use this plugin with Covalent, simply install it using pip
:
pip install covalent-braket-plugin
2. Usage Example#
The following toy example executes a simple quantum circuit on one qubit that prepares a uniform superposition of the standard basis states and then measures the state. We use the Pennylane framework.
import covalent as ct
from covalent_braket_plugin.braket import BraketExecutor
import os
# AWS resources to pass to the executor
credentials = "~/.aws/credentials"
profile = "default"
region = "us-east-1"
s3_bucket_name = "braket_s3_bucket"
ecr_repo_name = "braket_ecr_repo"
iam_role_name = "covalent-braket-iam-role"
# Instantiate the executor
ex = BraketExecutor(
profile=profile,
credentials=credentials_file,
s3_bucket_name=s3_bucket_name,
ecr_image_uri=ecr_image_uri,
braket_job_execution_role_name=iam_role_name,
quantum_device="arn:aws:braket:::device/quantum-simulator/amazon/sv1",
classical_device="ml.m5.large",
storage=30,
time_limit=300,
)
# Execute the following circuit:
# |0> - H - Measure
@ct.electron(executor=ex)
def simple_quantum_task(num_qubits: int):
import pennylane as qml
# These are passed to the Hybrid Jobs container at runtime
device_arn = os.environ["AMZN_BRAKET_DEVICE_ARN"]
s3_bucket = os.environ["AMZN_BRAKET_OUT_S3_BUCKET"]
s3_task_dir = os.environ["AMZN_BRAKET_TASK_RESULTS_S3_URI"].split(s3_bucket)[1]
device = qml.device(
"braket.aws.qubit",
device_arn=device_arn,
s3_destination_folder=(s3_bucket, s3_task_dir),
wires=num_qubits,
)
@qml.qnode(device=device)
def simple_circuit():
qml.Hadamard(wires=[0])
return qml.expval(qml.PauliZ(wires=[0]))
res = simple_circuit().numpy()
return res
@ct.lattice
def simple_quantum_workflow(num_qubits: int):
return simple_quantum_task(num_qubits=num_qubits)
dispatch_id = ct.dispatch(simple_quantum_workflow)(1)
result_object = ct.get_result(dispatch_id, wait=True)
# We expect 0 as the result
print("Result:", result_object.result)
During the execution of the workflow one can navigate to the UI to see the status of the workflow, once completed however the above script should also output a value with the output of the quantum measurement.
>>> Result: 0
3. Overview of Configuration#
Config Key |
Is Required |
Default |
Description |
---|---|---|---|
credentials |
No |
“~/.aws/credentials” |
The path to the AWS credentials file |
braket_job_execution_role_name |
Yes |
“CovalentBraketJobsExecutionRole” |
The name of the IAM role that Braket will assume during task execution. |
profile |
No |
“default” |
Named AWS profile used for authentication |
region |
Yes |
:code`AWS_DEFAULT_REGION` environment variable |
AWS Region to use to for client calls to AWS |
s3_bucket_name |
Yes |
amazon-braket-covalent-job-resources |
The S3 bucket where Covalent will store input and output files for the task. |
ecr_image_uri |
Yes |
An ECR repository for storing container images to be run by Braket. |
|
quantum_device |
No |
“arn:aws:braket:::device/quantum-simulator/amazon/sv1” |
The ARN of the quantum device to use |
classical_device |
No |
“ml.m5.large” |
Instance type for the classical device to use |
storage |
No |
30 |
Storage size in GB for the classical device |
time_limit |
No |
300 |
Max running time in seconds for the Braket job |
poll_freq |
No |
30 |
How often (in seconds) to poll Braket for the job status |
cache_dir |
No |
“/tmp/covalent” |
Location for storing temporary files generated by the Covalent server |
This plugin can be configured in one of two ways:
Configuration options can be passed in as constructor keys to the executor class
ct.executor.BraketExecutor
By modifying the covalent configuration file under the section
[executors.braket]
The following shows an example of how a user might modify their covalent configuration file to support this plugin:
[executors.braket]
quantum_device = "arn:aws:braket:::device/qpu/ionq/ionQdevice"
time_limit = 3600
4. Required Cloud Resources#
The Braket executor requires some resources to be provisioned on AWS. Precisely, users will need an S3 bucket, an ECR repo, and an IAM role with the appropriate permissions to be passed to Braket.
Resource |
Is Required |
Config Key |
Description |
---|---|---|---|
IAM role |
Yes |
|
An IAM role granting permissions to Braket, S3, ECR, and a few other resources. |
ECR repository |
Yes |
|
An ECR repository for storing container images to be run by Braket. |
S3 bucket |
Yes |
|
An S3 bucket for storing task-specific data, such as Braket outputs or function inputs. |
One can either follow the below instructions to manually create the resources or use the provided terraform script to auto-provision the resources needed.
The AWS documentation on S3 details how to configure an S3 bucket.
The permissions required for the the IAM role are documented in the article “managing access to Amazon Braket”. The following policy is attached to the default role “CovalentBraketJobsExecutionRole”:
In order to use the Braket executor plugin one must create a private ECR registry with a container image that will be used to execute the Braket jobs using covalent. One can either create an ECR repository manually or use the terraform script provided below. We host the image in our public repository at
public.ecr.aws/covalent/covalent-braket-executor:stable
Note
The container image can be uploaded to a private ECR as follows
docker pull public.ecr.aws/covalent/covalent-braket-executor:stable
Once the image has been obtained, user’s can tag it with their registry information and upload to ECR as follows
aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws_account_id>.dkr.ecr.<region>.amazonaws.com
docker tag public.ecr.aws/covalent/covalent-braket-executor:stable <aws_account_id>.dkr.ecr.<region>.amazonaws.com/<my-repository>:tag
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/<my-repository>:tag
Sample IAM policy for Braket’s execution role
- {
“Version”: “2012-10-17”, “Statement”: [
- {
“Sid”: “VisualEditor0”, “Effect”: “Allow”, “Action”: “cloudwatch:PutMetricData”, “Resource”: “*”, “Condition”: {
“StringEquals”: { “cloudwatch:namespace”: “/aws/braket” }
}
}, {
“Sid”: “VisualEditor1”, “Effect”: “Allow”, “Action”: [
“logs:CreateLogStream”, “logs:DescribeLogStreams”, “ecr:GetDownloadUrlForLayer”, “ecr:BatchGetImage”, “logs:StartQuery”, “logs:GetLogEvents”, “logs:CreateLogGroup”, “logs:PutLogEvents”, “ecr:BatchCheckLayerAvailability”
], “Resource”: [
“arn:aws:ecr::348041629502:repository/”, “arn:aws:logs:::log-group:/aws/braket*”
]
}, {
“Sid”: “VisualEditor2”, “Effect”: “Allow”, “Action”: “iam:PassRole”, “Resource”: “arn:aws:iam::348041629502:role/CovalentBraketJobsExecutionRole”, “Condition”: {
“StringLike”: { “iam:PassedToService”: “braket.amazonaws.com” }
}
}, {
“Sid”: “VisualEditor3”, “Effect”: “Allow”, “Action”: [
“braket:SearchDevices”, “s3:CreateBucket”, “ecr:BatchDeleteImage”, “ecr:BatchGetRepositoryScanningConfiguration”, “ecr:DeleteRepository”, “ecr:TagResource”, “ecr:BatchCheckLayerAvailability”, “ecr:GetLifecyclePolicy”, “braket:CreateJob”, “ecr:DescribeImageScanFindings”, “braket:GetJob”, “ecr:CreateRepository”, “ecr:PutImageScanningConfiguration”, “ecr:GetDownloadUrlForLayer”, “ecr:DescribePullThroughCacheRules”, “ecr:GetAuthorizationToken”, “ecr:DeleteLifecyclePolicy”, “braket:ListTagsForResource”, “ecr:PutImage”, “s3:PutObject”, “s3:GetObject”, “braket:GetDevice”, “ecr:UntagResource”, “ecr:BatchGetImage”, “ecr:DescribeImages”, “braket:CancelQuantumTask”, “ecr:StartLifecyclePolicyPreview”, “braket:CancelJob”, “ecr:InitiateLayerUpload”, “ecr:PutImageTagMutability”, “ecr:StartImageScan”, “ecr:DescribeImageReplicationStatus”, “ecr:ListTagsForResource”, “s3:ListBucket”, “ecr:UploadLayerPart”, “ecr:CreatePullThroughCacheRule”, “ecr:ListImages”, “ecr:GetRegistryScanningConfiguration”, “braket:TagResource”, “ecr:CompleteLayerUpload”, “ecr:DescribeRepositories”, “ecr:ReplicateImage”, “ecr:GetRegistryPolicy”, “ecr:PutLifecyclePolicy”, “s3:PutBucketPublicAccessBlock”, “ecr:GetLifecyclePolicyPreview”, “ecr:DescribeRegistry”, “braket:SearchJobs”, “braket:CreateQuantumTask”, “iam:ListRoles”, “ecr:PutRegistryScanningConfiguration”, “ecr:DeletePullThroughCacheRule”, “braket:UntagResource”, “ecr:BatchImportUpstreamImage”, “braket:GetQuantumTask”, “s3:PutBucketPolicy”, “braket:SearchQuantumTasks”, “ecr:GetRepositoryPolicy”, “ecr:PutReplicationConfiguration”
], “Resource”: “*”
}, {
“Sid”: “VisualEditor4”, “Effect”: “Allow”, “Action”: “logs:GetQueryResults”, “Resource”: “arn:aws:logs:::log-group:*”
}, {
“Sid”: “VisualEditor5”, “Effect”: “Allow”, “Action”: “logs:StopQuery”, “Resource”: “arn:aws:logs:::log-group:/aws/braket*”
}
]
}
Users can use the following Terraform snippet as a starting point to spin up the required resources
provider "aws" {}
data "aws_caller_identity" "current" {}
resource "aws_s3_bucket" "braket_bucket" {
bucket = "my-s3-bucket-name"
force_destroy = true
}
resource "aws_ecr_repository" "braket_ecr_repo" {
name = "amazon-braket-base-executor-repo"
image_tag_mutability = "MUTABLE"
force_delete = true
image_scanning_configuration {
scan_on_push = false
}
provisioner "local-exec" {
command = "docker pull public.ecr.aws/covalent/covalent-braket-executor:stable && aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin ${data.aws_caller_identity.current.account_id}.dkr.ecr.${var.aws_region}.amazonaws.com && docker tag public.ecr.aws/covalent/covalent-braket-executor:stable ${aws_ecr_repository.braket_ecr_repo.repository_url}:stable && docker push ${aws_ecr_repository.braket_ecr_repo.repository_url}:stable"
}
}
resource "aws_iam_role" "braket_iam_role" {
name = "amazon-braket-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Sid = ""
Principal = {
Service = "braket.amazonaws.com"
}
},
]
})
managed_policy_arns = ["arn:aws:iam::aws:policy/AmazonBraketFullAccess"]
}
- class covalent_braket_plugin.braket.BraketExecutor(ecr_image_uri=None, s3_bucket_name=None, braket_job_execution_role_name=None, classical_device=None, storage=None, time_limit=None, poll_freq=None, quantum_device=None, profile=None, credentials=None, cache_dir=None, region=None, **kwargs)[source]#
AWS Braket Hybrid Jobs executor plugin class.
Methods:
Returns a dictionary of kwargs to populate a new boto3.Session() instance with proper auth, region, and profile options.
cancel
()Abstract method that sends a cancellation request to the remote backend.
from_dict
(object_dict)Rehydrate a dictionary representation
Get if the task was requested to be canceled
get_dispatch_context
(dispatch_info)Start a context manager that will be used to access the dispatch info for the executor.
get_status
(braket, job_arn)Query the status of a previously submitted Braket hybrid job.
Query the database for dispatch version metadata.
poll
(task_group_metadata, data)Block until the job has reached a terminal state.
query_result
(query_metadata)Abstract method that retrieves the pickled result from the remote cache.
receive
(task_group_metadata, data)Return a list of task updates.
run
(function, args, kwargs, task_metadata)Abstract method to run a function in the executor in async-aware manner.
run_async_subprocess
(cmd)Invokes an async subprocess to run a command.
send
(task_specs, resources, task_group_metadata)Submit a list of task references to the compute backend.
set_job_handle
(handle)Save the job handle to database
set_job_status
(status)Validates and sets the job state
setup
(task_metadata)Executor specific setup method
submit_task
(submit_metadata)Abstract method that invokes the task on the remote backend.
teardown
(task_metadata)Executor specific teardown method
to_dict
()Return a JSON-serializable dictionary representation of self
validate_status
(status)Overridable filter
write_streams_to_file
(stream_strings, …)Write the contents of stdout and stderr to respective files.
- boto_session_options()#
Returns a dictionary of kwargs to populate a new boto3.Session() instance with proper auth, region, and profile options.
- Return type
Dict
[str
,str
]
- async cancel()[source]#
Abstract method that sends a cancellation request to the remote backend.
- Return type
bool
- from_dict(object_dict)#
Rehydrate a dictionary representation
- Parameters
object_dict (
dict
) – a dictionary representation returned by to_dict- Return type
- Returns
self
Instance attributes will be overwritten.
- async get_cancel_requested()#
Get if the task was requested to be canceled
- Arg(s)
None
- Return(s)
Whether the task has been requested to be cancelled
- Return type
Any
- get_dispatch_context(dispatch_info)#
Start a context manager that will be used to access the dispatch info for the executor.
- Parameters
dispatch_info (
DispatchInfo
) – The dispatch info to be used inside current context.- Return type
AbstractContextManager
[DispatchInfo
]- Returns
A context manager object that handles the dispatch info.
- async get_status(braket, job_arn)[source]#
Query the status of a previously submitted Braket hybrid job.
- Parameters
braket – Braket client object.
job_arn (
str
) – ARN used to identify a Braket hybrid job.
- Returns
String describing the job status.
- Return type
status
- async get_version_info()#
Query the database for dispatch version metadata.
- Arg:
dispatch_id: Dispatch ID of the lattice
- Returns
python_version, “covalent”: covalent_version}
- Return type
{“python”
- async poll(task_group_metadata, data)#
Block until the job has reached a terminal state.
- Parameters
task_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any
) – The return value of send().
The return value of poll() will be passed directly into receive().
Raise NotImplementedError to indicate that the compute backend will notify the Covalent server asynchronously of job completion.
- Return type
Any
- async query_result(query_metadata)[source]#
Abstract method that retrieves the pickled result from the remote cache.
- Return type
Any
- async receive(task_group_metadata, data)#
Return a list of task updates.
Each task must have reached a terminal state by the time this is invoked.
- Parameters
task_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any
) – The return value of poll() or the request body of /jobs/update.
- Return type
List
[TaskUpdate
]- Returns
Returns a list of task results, each a TaskUpdate dataclass of the form
- {
“dispatch_id”: dispatch_id, “node_id”: node_id, “status”: status, “assets”: {
- ”output”: {
“remote_uri”: output_uri,
}, “stdout”: {
”remote_uri”: stdout_uri,
}, “stderr”: {
”remote_uri”: stderr_uri,
},
},
}
corresponding to the node ids (task_ids) specified in the task_group_metadata. This might be a subset of the node ids in the originally submitted task group as jobs may notify Covalent asynchronously of completed tasks before the entire task group finishes running.
- async run(function, args, kwargs, task_metadata)[source]#
Abstract method to run a function in the executor in async-aware manner.
- Parameters
function (
Callable
) – The function to run in the executorargs (
List
) – List of positional arguments to be used by the functionkwargs (
Dict
) – Dictionary of keyword arguments to be used by the function.task_metadata (
Dict
) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id
- Returns
The result of the function execution
- Return type
output
- async static run_async_subprocess(cmd)#
Invokes an async subprocess to run a command.
- Return type
Tuple
- async send(task_specs, resources, task_group_metadata)#
Submit a list of task references to the compute backend.
- Parameters
task_specs (
List
[TaskSpec
]) – a list of TaskSpecsresources (
ResourceMap
) – a ResourceMap mapping task assets to URIstask_group_metadata (
Dict
) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.
The return value of send() will be passed directly into poll().
- Return type
Any
- async set_job_handle(handle)#
Save the job handle to database
- Arg(s)
handle: JSONable type identifying the job being executed by the backend
- Return(s)
Response from the listener that handles inserting the job handle to database
- Return type
Any
- async set_job_status(status)#
Validates and sets the job state
For use with send/receive API
- Return(s)
Whether the action succeeded
- Return type
bool
- async setup(task_metadata)#
Executor specific setup method
- async submit_task(submit_metadata)[source]#
Abstract method that invokes the task on the remote backend.
- Parameters
task_metadata – Dictionary of metadata for the task. Current keys are dispatch_id and node_id.
- Returns
Task UUID defined on the remote backend.
- Return type
task_uuid
- async teardown(task_metadata)#
Executor specific teardown method
- to_dict()#
Return a JSON-serializable dictionary representation of self
- Return type
dict
- validate_status(status)#
Overridable filter
- Return type
bool
- async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#
Write the contents of stdout and stderr to respective files.
- Parameters
stream_strings (
Iterable
[str
]) – The stream_strings to be written to files.filepaths (
Iterable
[str
]) – The filepaths to be used for writing the streams.dispatch_id (
str
) – The ID of the dispatch which initiated the request.results_dir (
str
) – The location of the results directory.
This uses aiofiles to avoid blocking the event loop.
- Return type
None