Asynchronous Base Executor Class#
- class covalent.executor.base.AsyncBaseExecutor(*args, **kwargs)[source]#
Async base executor class to be used for defining any executor plugin. Subclassing this class will allow you to define your own executor plugin which can be used in covalent.
This is analogous to BaseExecutor except the run() method, together with the optional setup() and teardown() methods, are coroutines.
- log_stdout#
The path to the file to be used for redirecting stdout.
- log_stderr#
The path to the file to be used for redirecting stderr.
- cache_dir#
The location used for cached files in the executor.
- time_limit#
time limit for the task
- retries#
Number of times to retry execution upon failure
Methods:
cancel(task_metadata, job_handle)Method to cancel the job identified uniquely by the job_handle (base class)
from_dict(object_dict)Rehydrate a dictionary representation
Get if the task was requested to be canceled
get_dispatch_context(dispatch_info)Start a context manager that will be used to access the dispatch info for the executor.
Query the database for dispatch version metadata.
poll(task_group_metadata, data)Block until the job has reached a terminal state.
receive(task_group_metadata, data)Return a list of task updates.
run(function, args, kwargs, task_metadata)Abstract method to run a function in the executor in async-aware manner.
send(task_specs, resources, task_group_metadata)Submit a list of task references to the compute backend.
set_job_handle(handle)Save the job handle to database
set_job_status(status)Validates and sets the job state
setup(task_metadata)Executor specific setup method
teardown(task_metadata)Executor specific teardown method
to_dict()Return a JSON-serializable dictionary representation of self
validate_status(status)Overridable filter
write_streams_to_file(stream_strings, …)Write the contents of stdout and stderr to respective files.
- async cancel(task_metadata, job_handle)[source]#
Method to cancel the job identified uniquely by the job_handle (base class)
- Arg(s)
task_metadata: Metadata of the task to be cancelled job_handle: Unique ID of the job assigned by the backend
- Return(s)
False by default
- Return type
bool
- from_dict(object_dict)#
Rehydrate a dictionary representation
- Parameters
object_dict (
dict) – a dictionary representation returned by to_dict- Return type
- Returns
self
Instance attributes will be overwritten.
- async get_cancel_requested()[source]#
Get if the task was requested to be canceled
- Arg(s)
None
- Return(s)
Whether the task has been requested to be cancelled
- Return type
Any
- get_dispatch_context(dispatch_info)#
Start a context manager that will be used to access the dispatch info for the executor.
- Parameters
dispatch_info (
DispatchInfo) – The dispatch info to be used inside current context.- Return type
AbstractContextManager[DispatchInfo]- Returns
A context manager object that handles the dispatch info.
- async get_version_info()[source]#
Query the database for dispatch version metadata.
- Arg:
dispatch_id: Dispatch ID of the lattice
- Returns
python_version, “covalent”: covalent_version}
- Return type
{“python”
- async poll(task_group_metadata, data)[source]#
Block until the job has reached a terminal state.
- Parameters
task_group_metadata (
Dict) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any) – The return value of send().
The return value of poll() will be passed directly into receive().
Raise NotImplementedError to indicate that the compute backend will notify the Covalent server asynchronously of job completion.
- Return type
Any
- async receive(task_group_metadata, data)[source]#
Return a list of task updates.
Each task must have reached a terminal state by the time this is invoked.
- Parameters
task_group_metadata (
Dict) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.data (
Any) – The return value of poll() or the request body of /jobs/update.
- Return type
List[TaskUpdate]- Returns
Returns a list of task results, each a TaskUpdate dataclass of the form
- {
“dispatch_id”: dispatch_id, “node_id”: node_id, “status”: status, “assets”: {
- ”output”: {
“remote_uri”: output_uri,
}, “stdout”: {
”remote_uri”: stdout_uri,
}, “stderr”: {
”remote_uri”: stderr_uri,
},
},
}
corresponding to the node ids (task_ids) specified in the task_group_metadata. This might be a subset of the node ids in the originally submitted task group as jobs may notify Covalent asynchronously of completed tasks before the entire task group finishes running.
- abstract async run(function, args, kwargs, task_metadata)[source]#
Abstract method to run a function in the executor in async-aware manner.
- Parameters
function (
Callable) – The function to run in the executorargs (
List) – List of positional arguments to be used by the functionkwargs (
Dict) – Dictionary of keyword arguments to be used by the function.task_metadata (
Dict) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id
- Returns
The result of the function execution
- Return type
output
- async send(task_specs, resources, task_group_metadata)[source]#
Submit a list of task references to the compute backend.
- Parameters
task_specs (
List[TaskSpec]) – a list of TaskSpecsresources (
ResourceMap) – a ResourceMap mapping task assets to URIstask_group_metadata (
Dict) – A dictionary of metadata for the task group. Current keys are dispatch_id, node_ids, and task_group_id.
The return value of send() will be passed directly into poll().
- Return type
Any
- async set_job_handle(handle)[source]#
Save the job handle to database
- Arg(s)
handle: JSONable type identifying the job being executed by the backend
- Return(s)
Response from the listener that handles inserting the job handle to database
- Return type
Any
- async set_job_status(status)[source]#
Validates and sets the job state
For use with send/receive API
- Return(s)
Whether the action succeeded
- Return type
bool
- to_dict()#
Return a JSON-serializable dictionary representation of self
- Return type
dict
- async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)[source]#
Write the contents of stdout and stderr to respective files.
- Parameters
stream_strings (
Iterable[str]) – The stream_strings to be written to files.filepaths (
Iterable[str]) – The filepaths to be used for writing the streams.dispatch_id (
str) – The ID of the dispatch which initiated the request.results_dir (
str) – The location of the results directory.
This uses aiofiles to avoid blocking the event loop.
- Return type
None