Creating a Custom Executor#
Executors define how a backend resource handles computations. They specify everything about the resource: the hardware and configuration, the computation strategy, logic, and even goals.
Executors are plugins. Any executor plugins found by the dispatcher are imported as classes in the covalent.executor
name space.
Covalent already contains a number of versatile executors. (See Choosing an Executor For a Task for information about choosing an existing executor.)
If an existing executor does not fit your needs, you can write your own, using your choice of environments, hardware, and cloud resources to execute Covalent electrons however you like. A template to write an executor can be found here.
Prerequisites#
Decide the purpose of the executor. You should have a good handle on the following questions: - What is the purpose of the executor? - What types of tasks is it designed to run? - What capabilities does the executor require that aren’t already in an existing executor? - What hardware or cloud resource will it run on? - Will it scale? How?
Procedure#
The following example creates a TimingExecutor
that computes the CPU time used by the function to help determine its efficiency. It then writes this result to a file along with its dispatch_id
and node_id
.
Decide whether to make your executor asynchronous.
Covalent is written to be capable of running asynchronous (async) executors. In general, Covalent suggests that you write your custom executors to be async-capable as well, especially if it depends on network communication or has I/O-bound logic inside the run()
function.
Some examples of async executors are: - The default DaskExecutor - SSHPlugin - SlurmPlugin.
To make your executor async-capable, do the following:
i. Subclass `AsyncBaseExecutor` instead of `BaseExecutor`
ii. Define your `run()` function with:
`async def run(...)`
instead of
`def run(...)`
Import the Covalent
BaseExecutor
(orAsyncBaseExecutor
) and Pythontyping
libraries.
Write the plugin class. The class must contain:
executor_plugin_name
, name of the executor to make it importable bycovalent.executors
- must be the same as its class nameThe default values for the plugin parameters defined in
_EXECUTOR_PLUGIN_DEFAULTS
.A
run()
function that handles the task to be executed. Therun()
function must take these parameters:A
Callable
object to contain the task;A list of arguments (
args
) and a dictionary of keyword arguments (kwargs
) to pass to theCallable
.A dictionary,
task_metadata
, to store thedispatch_id
andnode_id
(and possibly other metadata in the future).
_EXECUTOR_PLUGIN_DEFAULTS
, if there are any defaults for the executor.
With all the above in mind, the example TimingExecutor
class looks like this:
Since we are not planning on having any defaults, we can skip adding _EXECUTOR_PLUGIN_DEFAULTS
here. Ideally any collection of custom executors should reside in a separate directory, so let’s create a new dir for our plugin:
In a terminal window:
mkdir custom_executors
And copy the following snippet in a file timing_plugin.py
in that directory:
[1]:
"""Timing executor plugin for Covalent."""
import time
from pathlib import Path
from typing import Any, Callable, Dict, List
from covalent.executor.base import BaseExecutor
executor_plugin_name = "TimingExecutor" # Required by covalent.executors
_EXECUTOR_PLUGIN_DEFAULTS = {
"timing_filepath": ""
} # Set default values for executor plugin parameters here
class TimingExecutor(BaseExecutor):
"""Executor that times the execution time."""
def __init__(self, timing_filepath: str = "", **kwargs):
"""Init function.
Args:
timing_filepath: Filepath where the timing information will be written.
"""
self.timing_filepath = str(Path(timing_filepath).resolve())
super().__init__(**kwargs)
def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:
"""Measures the time taken to execute a function.
Args:
function: Function to be executed.
args: Arguments to be passed to the function.
kwargs: Keyword arguments to be passed to the function.
task_metadata: Metadata about the task. Expects node_id and dispatch_id.
Returns:
The result of the function.
"""
start = time.process_time()
result = function(*args, **kwargs)
time_taken = time.process_time() - start
with open(f"{self.timing_filepath}", "w") as f:
f.write(
f"Node {task_metadata['node_id']} in dispatch {task_metadata['dispatch_id']} took {time_taken}s of CPU time.\n"
)
return result
Now, assuming the custom_executors
directory is in the same directory as this notebook, let’s inform covalent on where to find the plugin:
In a terminal window:
export COVALENT_EXECUTOR_DIR=$PWD/custom_executors
covalent restart
At this point the executor is ready for use.
Construct electrons and assign them to the new executor, then execute them in a lattice:
[2]:
import covalent as ct
import sys
sys.path.append(str(Path("./custom_executors").resolve()))
from custom_executors import TimingExecutor
timing_log = "/tmp/cpu_timing.log"
timing_executor = TimingExecutor(timing_log)
# Calculate e based on a series
@ct.electron(executor=timing_executor)
def e_ser(x):
e_est = 1
fact = 1
for i in range(1, x):
fact *= i
e_est += 1 / fact
return e_est
@ct.lattice(executor=timing_executor)
def workflow(x):
return e_ser(x)
Run the lattice:
[3]:
dispatch_id = ct.dispatch(workflow)(10)
result = ct.get_result(dispatch_id, wait=True)
print(result)
for line in open(timing_log, "r"):
print(line)
Lattice Result
==============
status: COMPLETED
result: 2.7182815255731922
input args: ['10']
input kwargs: {}
error: None
start_time: 2023-04-14 11:19:44.967362
end_time: 2023-04-14 11:19:45.108294
results_dir: /home/user/.local/share/covalent/data
dispatch_id: 35b9dda6-c4b8-4ec2-b820-67fb3ab606dc
Node Outputs
------------
e_ser(0): 2.7182815255731922
:parameter:10(1): 10
:postprocess:(2): 2.7182815255731922
Node 0 in dispatch 35b9dda6-c4b8-4ec2-b820-67fb3ab606dc took 0.00019649999999948875s of CPU time.
See Also#
Adding Constraints to Tasks and Workflows