Creating a Custom Executor#
Executors define how abackend resource handles computations. They specify everything about the resource: the hardware and configuration, the computation strategy, logic, and even goals.
Executors are plugins. Any executor plugins found by the dispatcher are imported as classes in the covalent.executor
name space.
Covalent already contains a number of versatile executors. (See Choosing an Executor For a Task for information about choosing an existing executor.)
If an existing executor does not fit your needs, you can write your own, using your choice of environments, hardware, and cloud resources to execute Covalent electrons however you like. A template to write an executor can be found here.
Prerequisites#
Decide the purpose of the executor. You should have a good handle on the following questions: - What is the purpose of the executor? - What types of tasks is it designed to run? - What capabilities does the executor require that aren’t already in an existing executor? - What hardware or cloud resource will it run on? - Will it scale? How?
Procedure#
The following example creates a TimingExecutor
that computes the CPU time used by the function to help determine its efficiency. It then writes this result to a file along with its dispatch_id
and node_id
.
Decide whether to make your executor asynchronous.
Covalent is written to be capable of running asynchronous (async) executors. In general, Covalent suggests that you write your custom executors to be async-capable as well, especially if it depends on network communication or has I/O-bound logic inside the run()
function.
Some examples of async executors are: - The default DaskExecutor - SSHPlugin - SlurmPlugin.
To make your executor async-capable, do the following: 1. Subclass AsyncBaseExecutor
instead of BaseExecutor
2. Define your run()
function with:
`async def run(...)`
instead of
`def run(...)`
Import the Covalent
BaseExecutor
(orAsyncBaseExecutor
) and Pythontyping
libraries.
[1]:
# timing_plugin.py
from covalent.executor import BaseExecutor
from typing import Callable, Dict, List
import time
from pathlib import Path
Write the plugin class. The class must contain:
The class name of the executor, shared in
executor_plugin_name
to make it importable bycovalent.executors
.A
run()
function that handles the task to be executed. Therun()
function must take these parameters:A
Callable
object to contain the task;A list of arguments (
args
) and a dictionary of keyword arguments (kwargs
) to pass to theCallable
.A dictionary,
task_metadata
, to store thedispatch_id
andnode_id
(and possibly other metadata in the future).
_EXECUTOR_PLUGIN_DEFAULTS
, if there are any defaults for the executor.
With all the above in mind, the example TimingExecutor
class looks like this:
[2]:
executor_plugin_name = "TimingExecutor" # Required by covalent.executors
class TimingExecutor(BaseExecutor):
def __init__(self, timing_filepath, **kwargs):
self.timing_filepath = str(Path(timing_filepath).resolve())
super().__init__(**kwargs)
def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict):
start = time.process_time()
result = function(*args, **kwargs)
time_taken = time.process_time() - start
with open(f"{self.timing_filepath}", "a") as f:
f.write(f"Node {task_metadata['node_id']} in dispatch {task_metadata['dispatch_id']} took {time_taken}s of CPU time.")
close(f)
return result
At this point the executor is ready for use (or at least testing).
Construct electrons and assign them to the new executor, then execute them in a lattice:
[3]:
import covalent as ct
timing_log = "./cpu_timing.log"
timing_executor = TimingExecutor(timing_log)
# Calculate e based on a series
@ct.electron
def e_ser(x):
e_est = 1
fact = 1
for i in range(1, x):
fact *= i
e_est += 1/fact
return e_est
@ct.lattice
def workflow(x):
return e_ser(x)
Run the lattice:
[4]:
dispatch_id = ct.dispatch(workflow)(10)
result = ct.get_result(dispatch_id, wait=True)
print(result)
for line in open(timing_log, 'r'):
print(line)
Lattice Result
==============
status: COMPLETED
result: 2.7182815255731922
input args: ['10']
input kwargs: {}
error: None
start_time: 2023-01-31 23:07:16.920729
end_time: 2023-01-31 23:07:17.030380
results_dir: /Users/mini-me/agnostiq/covalent/doc/source/how_to/execution/results
dispatch_id: a2119573-5465-4390-869b-5709991da0e1
Node Outputs
------------
e_ser(0): 2.7182815255731922
:parameter:10(1): 10
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Input In [4], in <cell line: 5>()
2 result = ct.get_result(dispatch_id, wait=True)
3 print(result)
----> 5 for line in open(timing_log, 'r'):
6 print(line)
FileNotFoundError: [Errno 2] No such file or directory: './cpu_timing.log'
See Also#
Adding Constraints to Tasks and Workflows
Choosing an Executor For a Task
[ ]: