How to create a custom executor#

Executors define the low-level directions for the computation. They can specify different capabilities, eg., different hardware, different computation strategy, different logic, or simply different goals.

Executors are plugins; any executor-plugins which are found are imported as classes in the covalent.executor name-space. See the how-to on choosing an executor to be used in an electron for details on simply choosing an executor.

You can write your own executor to execute Covalent electrons in any way you like, using particular environments, cloud resources, or hardware.

A template to write one can be found here.

Following things should be part of the plugin file:

  1. _EXECUTOR_PLUGIN_DEFAULTS, if there are any defaults for the executor

  2. executor_plugin_name, name of the executor to make it importable by covalent.executors - must be the same as its class name

  3. run(), core logic of how the executor will handle the user function, its args and its kwargs

As an example, let’s construct a TimingExecutor to get the CPU time used by the function and measure its code efficiency. We’ll write that result to a file along with its dispatch_id and node_id for comparison.

[ ]:

from covalent.executor import BaseExecutor
from typing import Callable, Dict, List
import time
from pathlib import Path

executor_plugin_name = "TimingExecutor"

class TimingExecutor(BaseExecutor):

    def __init__(self, timing_filepath, **kwargs):
        self.timing_filepath = str(Path(timing_filepath).resolve())

    def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict):

        start = time.process_time()

        result = function(*args, **kwargs)

        time_taken = time.process_time() - start

        with open(f"{self.timing_filepath}", "a") as f:
            f.writelines(f"Node {task_metadata['node_id']} in dispatch {task_metadata['dispatch_id']} took {time_taken}s of CPU time.")

        return result

[ ]:
import covalent as ct

timing_executor = TimingExecutor(".")

def add(x, y):
    return x + y

def multiply(x, y):
    return x * y

def workflow(x, y):
    r1 = add(x, y)
    return multiply(y, r1)

Since Covalent itself is written in an async aware manner, it is suggested that you write your custom executors to be async-aware as well, especially if there is going to be a network or IO bound logic inside run(). You can do so by subclassing AsyncBaseExecutor instead of BaseExecutor, and defining your run() with async def run(...) instead of def run(...).

Some examples of such executors are: the default DaskExecutor, SSHPlugin, SlurmPlugin.