Dask Executor#
Executing tasks (electrons) in a Dask cluster. This is the default executor when covalent is started without the --no-cluster
flag.
from dask.distributed import LocalCluster
cluster = LocalCluster()
print(cluster.scheduler_address)
The address will look like tcp://127.0.0.1:55564
when running locally. Note that the Dask cluster does not persist when the process terminates.
This cluster can be used with Covalent by providing the scheduler address:
import covalent as ct
dask_executor = ct.executor.DaskExecutor(
scheduler_address="tcp://127.0.0.1:55564"
)
@ct.electron(executor=dask_executor)
def my_custom_task(x, y):
return x + y
...
- class covalent.executor.executor_plugins.dask.DaskExecutor(scheduler_address='', log_stdout='stdout.log', log_stderr='stderr.log', conda_env='', cache_dir='', current_env_on_conda_fail=False)[source]#
Dask executor class that submits the input function to a running dask cluster.
Methods:
from_dict
(object_dict)Rehydrate a dictionary representation
get_dispatch_context
(dispatch_info)Start a context manager that will be used to access the dispatch info for the executor.
run
(function, args, kwargs, task_metadata)Submit the function and inputs to the dask cluster
setup
(task_metadata)Executor specific setup method
teardown
(task_metadata)Executor specific teardown method
to_dict
()Return a JSON-serializable dictionary representation of self
write_streams_to_file
(stream_strings, …)Write the contents of stdout and stderr to respective files.
- from_dict(object_dict)#
Rehydrate a dictionary representation
- Parameters
object_dict (
dict
) – a dictionary representation returned by to_dict- Return type
- Returns
self
Instance attributes will be overwritten.
- get_dispatch_context(dispatch_info)#
Start a context manager that will be used to access the dispatch info for the executor.
- Parameters
dispatch_info (
DispatchInfo
) – The dispatch info to be used inside current context.- Return type
AbstractContextManager
[DispatchInfo
]- Returns
A context manager object that handles the dispatch info.
- async run(function, args, kwargs, task_metadata)[source]#
Submit the function and inputs to the dask cluster
- async setup(task_metadata)#
Executor specific setup method
- async teardown(task_metadata)#
Executor specific teardown method
- to_dict()#
Return a JSON-serializable dictionary representation of self
- Return type
dict
- async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#
Write the contents of stdout and stderr to respective files.
- Parameters
stream_strings (
Iterable
[str
]) – The stream_strings to be written to files.filepaths (
Iterable
[str
]) – The filepaths to be used for writing the streams.dispatch_id (
str
) – The ID of the dispatch which initiated the request.results_dir (
str
) – The location of the results directory.
This uses aiofiles to avoid blocking the event loop.
- Return type
None