Dask Executor#
Executing tasks (electrons) in a Dask cluster. This is the default executor when covalent is started without the --no-cluster
flag.
from dask.distributed import LocalCluster
cluster = LocalCluster()
print(cluster.scheduler_address)
The address will look like tcp://127.0.0.1:55564
when running locally. Note that the Dask cluster does not persist when the process terminates.
This cluster can be used with Covalent by providing the scheduler address:
import covalent as ct
dask_executor = ct.executor.DaskExecutor(
scheduler_address="tcp://127.0.0.1:55564"
)
@ct.electron(executor=dask_executor)
def my_custom_task(x, y):
return x + y
...
- class covalent.executor.executor_plugins.dask.DaskExecutor(scheduler_address='', log_stdout='stdout.log', log_stderr='stderr.log', conda_env='', cache_dir='', current_env_on_conda_fail=False, workdir='', create_unique_workdir=None)[source]#
Dask executor class that submits the input function to a running dask cluster.
Methods:
cancel
(task_metadata, job_handle)Cancel the task being executed by the dask executor currently
from_dict
(object_dict)Rehydrate a dictionary representation
Get if the task was requested to be canceled
get_dispatch_context
(dispatch_info)Start a context manager that will be used to access the dispatch info for the executor.
run
(function, args, kwargs, task_metadata)Submit the function and inputs to the dask cluster
set_job_handle
(handle)Save the job handle to database
setup
(task_metadata)Executor specific setup method
teardown
(task_metadata)Executor specific teardown method
to_dict
()Return a JSON-serializable dictionary representation of self
write_streams_to_file
(stream_strings, …)Write the contents of stdout and stderr to respective files.
- async cancel(task_metadata, job_handle)[source]#
Cancel the task being executed by the dask executor currently
- Arg(s)
task_metadata: Metadata associated with the task job_handle: Key assigned to the job by Dask
- Return(s)
True by default
- Return type
Literal
[True]
- from_dict(object_dict)#
Rehydrate a dictionary representation
- Parameters
object_dict (
dict
) – a dictionary representation returned by to_dict- Return type
- Returns
self
Instance attributes will be overwritten.
- async get_cancel_requested()#
Get if the task was requested to be canceled
- Arg(s)
None
- Return(s)
Whether the task has been requested to be cancelled
- Return type
Any
- get_dispatch_context(dispatch_info)#
Start a context manager that will be used to access the dispatch info for the executor.
- Parameters
dispatch_info (
DispatchInfo
) – The dispatch info to be used inside current context.- Return type
AbstractContextManager
[DispatchInfo
]- Returns
A context manager object that handles the dispatch info.
- async run(function, args, kwargs, task_metadata)[source]#
Submit the function and inputs to the dask cluster
- async set_job_handle(handle)#
Save the job handle to database
- Arg(s)
handle: JSONable type identifying the job being executed by the backend
- Return(s)
Response from the listener that handles inserting the job handle to database
- Return type
Any
- async setup(task_metadata)#
Executor specific setup method
- async teardown(task_metadata)#
Executor specific teardown method
- to_dict()#
Return a JSON-serializable dictionary representation of self
- Return type
dict
- async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#
Write the contents of stdout and stderr to respective files.
- Parameters
stream_strings (
Iterable
[str
]) – The stream_strings to be written to files.filepaths (
Iterable
[str
]) – The filepaths to be used for writing the streams.dispatch_id (
str
) – The ID of the dispatch which initiated the request.results_dir (
str
) – The location of the results directory.
This uses aiofiles to avoid blocking the event loop.
- Return type
None