Creating a Custom Executor#

Executors define how a backend resource handles computations. They specify everything about the resource: the hardware and configuration, the computation strategy, logic, and even goals.

Executors are plugins. Any executor plugins found by the dispatcher are imported as classes in the covalent.executor name space.

Covalent already contains a number of versatile executors. (See Choosing an Executor For a Task for information about choosing an existing executor.)

If an existing executor does not fit your needs, you can write your own, using your choice of environments, hardware, and cloud resources to execute Covalent electrons however you like. A template to write an executor can be found here.

Prerequisites#

Decide the purpose of the executor. You should have a good handle on the following questions: - What is the purpose of the executor? - What types of tasks is it designed to run? - What capabilities does the executor require that aren’t already in an existing executor? - What hardware or cloud resource will it run on? - Will it scale? How?

Procedure#

The following example creates a TimingExecutor that computes the CPU time used by the function to help determine its efficiency. It then writes this result to a file along with its dispatch_id and node_id.

  1. Decide whether to make your executor asynchronous.

Covalent is written to be capable of running asynchronous (async) executors. In general, Covalent suggests that you write your custom executors to be async-capable as well, especially if it depends on network communication or has I/O-bound logic inside the run() function.

Some examples of async executors are: - The default DaskExecutor - SSHPlugin - SlurmPlugin.

To make your executor async-capable, do the following:

i. Subclass `AsyncBaseExecutor` instead of `BaseExecutor`
ii. Define your `run()` function with:

    `async def run(...)`

    instead of

    `def run(...)`
  1. Import the Covalent BaseExecutor (or AsyncBaseExecutor) and Python typing libraries.

  1. Write the plugin class. The class must contain:

  • executor_plugin_name, name of the executor to make it importable by covalent.executors - must be the same as its class name

  • The default values for the plugin parameters defined in _EXECUTOR_PLUGIN_DEFAULTS.

  • A run() function that handles the task to be executed. The run() function must take these parameters:

    • A Callable object to contain the task;

    • A list of arguments (args) and a dictionary of keyword arguments (kwargs) to pass to the Callable.

    • A dictionary, task_metadata, to store the dispatch_id and node_id (and possibly other metadata in the future).

  • _EXECUTOR_PLUGIN_DEFAULTS, if there are any defaults for the executor.

With all the above in mind, the example TimingExecutor class looks like this:

Since we are not planning on having any defaults, we can skip adding _EXECUTOR_PLUGIN_DEFAULTS here. Ideally any collection of custom executors should reside in a separate directory, so let’s create a new dir for our plugin:

In a terminal window:

mkdir custom_executors

And copy the following snippet in a file timing_plugin.py in that directory:

[1]:
"""Timing executor plugin for Covalent."""

import time
from pathlib import Path
from typing import Any, Callable, Dict, List

from covalent.executor.base import BaseExecutor

executor_plugin_name = "TimingExecutor"  # Required by covalent.executors

_EXECUTOR_PLUGIN_DEFAULTS = {
    "timing_filepath": ""
}  # Set default values for executor plugin parameters here


class TimingExecutor(BaseExecutor):
    """Executor that times the execution time."""

    def __init__(self, timing_filepath: str = "", **kwargs):
        """Init function.

        Args:
            timing_filepath: Filepath where the timing information will be written.

        """
        self.timing_filepath = str(Path(timing_filepath).resolve())
        super().__init__(**kwargs)

    def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:
        """Measures the time taken to execute a function.

        Args:
            function: Function to be executed.
            args: Arguments to be passed to the function.
            kwargs: Keyword arguments to be passed to the function.
            task_metadata: Metadata about the task. Expects node_id and dispatch_id.

        Returns:
            The result of the function.

        """
        start = time.process_time()

        result = function(*args, **kwargs)

        time_taken = time.process_time() - start

        with open(f"{self.timing_filepath}", "w") as f:
            f.write(
                f"Node {task_metadata['node_id']} in dispatch {task_metadata['dispatch_id']} took {time_taken}s of CPU time.\n"
            )

        return result

Now, assuming the custom_executors directory is in the same directory as this notebook, let’s inform covalent on where to find the plugin:

In a terminal window:

export COVALENT_EXECUTOR_DIR=$PWD/custom_executors
covalent restart

At this point the executor is ready for use.

  1. Construct electrons and assign them to the new executor, then execute them in a lattice:

[2]:
import covalent as ct
import sys

sys.path.append(str(Path("./custom_executors").resolve()))
from custom_executors import TimingExecutor


timing_log = "/tmp/cpu_timing.log"
timing_executor = TimingExecutor(timing_log)

# Calculate e based on a series
@ct.electron(executor=timing_executor)
def e_ser(x):
    e_est = 1
    fact = 1
    for i in range(1, x):
        fact *= i
        e_est += 1 / fact
    return e_est


@ct.lattice(executor=timing_executor)
def workflow(x):
    return e_ser(x)

  1. Run the lattice:

[3]:
dispatch_id = ct.dispatch(workflow)(10)
result = ct.get_result(dispatch_id, wait=True)
print(result)

for line in open(timing_log, "r"):
    print(line)


Lattice Result
==============
status: COMPLETED
result: 2.7182815255731922
input args: ['10']
input kwargs: {}
error: None

start_time: 2023-04-14 11:19:44.967362
end_time: 2023-04-14 11:19:45.108294

results_dir: /home/user/.local/share/covalent/data
dispatch_id: 35b9dda6-c4b8-4ec2-b820-67fb3ab606dc

Node Outputs
------------
e_ser(0): 2.7182815255731922
:parameter:10(1): 10
:postprocess:(2): 2.7182815255731922

Node 0 in dispatch 35b9dda6-c4b8-4ec2-b820-67fb3ab606dc took 0.00019649999999948875s of CPU time.

See Also#

Adding Constraints to Tasks and Workflows

Choosing an Executor For a Task

Executor Template (GitHub)

DaskExecutor (GitHub)

SSHPlugin (GitHub)

SlurmPlugin (GitHub)