Covalent API#

The following API documentation describes how to use Covalent.

Electron#

@covalent.electron(_func=None, *, backend=None, executor=None, files=[], deps_bash=None, deps_pip=None, call_before=[], call_after=[])#

Electron decorator to be called upon a function. Returns the wrapper function with the same functionality as _func.

Parameters

_func (Optional[Callable]) – function to be decorated

Keyword Arguments
  • backend – DEPRECATED: Same as executor.

  • executor – Alternative executor object to be used by the electron execution. If not passed, the dask executor is used by default.

  • deps_bash – An optional DepsBash object specifying a list of shell commands to run before _func

  • deps_pip – An optional DepsPip object specifying a list of PyPI packages to install before running _func

  • call_before – An optional list of DepsCall objects specifying python functions to invoke before the electron

  • call_after – An optional list of DepsCall objects specifying python functions to invoke after the electron

  • files – An optional list of FileTransfer objects which copy files to/from remote or local filesystems.

Returns

Electron object inside which the decorated function exists.

Return type

Electron

class covalent._workflow.electron.Electron(function, node_id=None, metadata=None, task_group_id=None, packing_tasks=False)[source]#

An electron (or task) object that is a modular component of a work flow and is returned by electron.

function#

Function to be executed.

node_id#

Node id of the electron.

metadata#

Metadata to be used for the function execution.

kwargs#

Keyword arguments if any.

task_group_id#

the group to which the task be assigned when it is bound to a graph node. If unset, the group id will default to node id.

packing_tasks#

Flag to indicate whether task packing is enabled.

Methods:

add_collection_node_to_graph(graph, prefix)

Adds the node to lattice’s transport graph in the case where a collection of electrons is passed as an argument to another electron.

connect_node_with_others(node_id, …)

Adds a node along with connecting edges for all the arguments to the electron.

get_metadata(name)

Get value of the metadata of given name.

get_op_function(operand_1, operand_2, op)

Function to handle binary operations with electrons as operands.

set_metadata(name, value)

Function to add/edit metadata of given name and value to electron’s metadata.

wait_for(electrons)

Waits for the given electrons to complete before executing this one.

Attributes:

as_transportable_dict

Get transportable electron object and metadata.

add_collection_node_to_graph(graph, prefix)[source]#

Adds the node to lattice’s transport graph in the case where a collection of electrons is passed as an argument to another electron.

Parameters
  • graph (_TransportGraph) – Transport graph of the lattice

  • prefix (str) – Prefix of the node

Returns

Node id of the added node

Return type

node_id

property as_transportable_dict: Dict#

Get transportable electron object and metadata.

Return type

Dict

connect_node_with_others(node_id, param_name, param_value, param_type, arg_index, transport_graph)[source]#

Adds a node along with connecting edges for all the arguments to the electron.

Parameters
  • node_id (int) – Node number of the electron

  • param_name (str) – Name of the parameter

  • param_value (Union[Any, ForwardRef]) – Value of the parameter

  • param_type (str) – Type of parameter, positional or keyword

  • transport_graph (_TransportGraph) – Transport graph of the lattice

Returns

None

get_metadata(name)[source]#

Get value of the metadata of given name.

Parameters

name (str) – Name of the metadata whose value is needed.

Returns

Value of the metadata of given name.

Return type

value

Raises

KeyError – If metadata of given name is not present.

get_op_function(operand_1, operand_2, op)[source]#

Function to handle binary operations with electrons as operands. This will not execute the operation but rather create another electron which will be postponed to be executed according to the default electron configuration/metadata.

This also makes sure that if these operations are being performed outside of a lattice, then they are performed as is.

Parameters
  • operand_1 (Union[Any, Electron]) – First operand of the binary operation.

  • operand_2 (Union[Any, Electron]) – Second operand of the binary operation.

  • op (str) – Operator to be used in the binary operation.

Returns

Electron object corresponding to the operation execution.

Behaves as a normal function call if outside a lattice.

Return type

electron

set_metadata(name, value)[source]#

Function to add/edit metadata of given name and value to electron’s metadata.

Parameters
  • name (str) – Name of the metadata to be added/edited.

  • value (Any) – Value of the metadata to be added/edited.

Return type

None

Returns

None

wait_for(electrons)[source]#

Waits for the given electrons to complete before executing this one. Adds the necessary edges between this and those electrons without explicitly connecting their inputs/outputs.

Useful when execution of this electron relies on a side-effect from the another one.

Parameters

electrons (Union[Electron, Iterable[Electron]]) – Electron(s) which will be waited for to complete execution before starting execution for this one

Returns

Electron


Lattice#

@covalent.lattice(_func=None, *, backend=None, executor=None, workflow_executor=None, deps_bash=None, deps_pip=None, call_before=[], call_after=[], triggers=None)#

Lattice decorator to be called upon a function. Returns a new Lattice <covalent._workflow.lattice.Lattice> object.

Parameters

_func (Optional[Callable]) – function to be decorated

Keyword Arguments
  • backend – DEPRECATED: Same as executor.

  • executor – Alternative executor object to be used in the execution of each node. If not passed, the local executor is used by default.

  • workflow_executor – Executor for postprocessing the workflow. Defaults to the built-in dask executor or the local executor depending on whether Covalent is started with the –no-cluster option.

  • deps_bash – An optional DepsBash object specifying a list of shell commands to run before _func

  • deps_pip – An optional DepsPip object specifying a list of PyPI packages to install before running _func

  • call_before – An optional list of DepsCall objects specifying python functions to invoke before the electron

  • call_after – An optional list of DepsCall objects specifying python functions to invoke after the electron

  • triggers – Any triggers that need to be attached to this lattice, default is None

Returns

Lattice object inside which the decorated function exists.

Return type

Lattice

class covalent._workflow.lattice.Lattice(workflow_function, transport_graph=None)[source]#

A lattice workflow object that holds the work flow graph and is returned by lattice decorator.

workflow_function#

The workflow function that is decorated by lattice decorator.

transport_graph#

The transport graph which will be the basis on how the workflow is executed.

metadata#

Dictionary of metadata of the lattice.

post_processing#

Boolean to indicate if the lattice is in post processing mode or not.

kwargs#

Keyword arguments passed to the workflow function.

electron_outputs#

Dictionary of electron outputs received after workflow execution.

Methods:

build_graph(*args, **kwargs)

Builds the transport graph for the lattice by executing the workflow function which will trigger the call of all underlying electrons and they will get added to the transport graph for later execution.

dispatch(*args, **kwargs)

DEPRECATED: Function to dispatch workflows.

dispatch_sync(*args, **kwargs)

DEPRECATED: Function to dispatch workflows synchronously by waiting for the result too.

draw(*args, **kwargs)

Generate lattice graph and display in UI taking into account passed in arguments.

get_metadata(name)

Get value of the metadata of given name.

set_metadata(name, value)

Function to add/edit metadata of given name and value to lattice’s metadata.

build_graph(*args, **kwargs)[source]#

Builds the transport graph for the lattice by executing the workflow function which will trigger the call of all underlying electrons and they will get added to the transport graph for later execution.

Also redirects any print statements inside the lattice function to null and ignores any exceptions caused while executing the function.

GRAPH WILL NOT BE BUILT AFTER AN EXCEPTION HAS OCCURRED.

Parameters
  • *args – Positional arguments to be passed to the workflow function.

  • **kwargs – Keyword arguments to be passed to the workflow function.

Return type

None

Returns

None

dispatch(*args, **kwargs)[source]#

DEPRECATED: Function to dispatch workflows.

Parameters
  • *args – Positional arguments for the workflow

  • **kwargs – Keyword arguments for the workflow

Return type

str

Returns

Dispatch id assigned to job

dispatch_sync(*args, **kwargs)[source]#

DEPRECATED: Function to dispatch workflows synchronously by waiting for the result too.

Parameters
  • *args – Positional arguments for the workflow

  • **kwargs – Keyword arguments for the workflow

Return type

Result

Returns

Result of workflow execution

draw(*args, **kwargs)[source]#

Generate lattice graph and display in UI taking into account passed in arguments.

Parameters
  • *args – Positional arguments to be passed to build the graph.

  • **kwargs – Keyword arguments to be passed to build the graph.

Return type

None

Returns

None

get_metadata(name)[source]#

Get value of the metadata of given name.

Parameters

name (str) – Name of the metadata whose value is needed.

Returns

Value of the metadata of given name.

Return type

value

Raises

KeyError – If metadata of given name is not present.

set_metadata(name, value)[source]#

Function to add/edit metadata of given name and value to lattice’s metadata.

Parameters
  • name (str) – Name of the metadata to be added/edited.

  • value (Any) – Value of the metadata to be added/edited.

Return type

None

Returns

None


Local Executor#

Executing tasks (electrons) directly on the local machine

class covalent.executor.executor_plugins.local.LocalExecutor(*args, **kwargs)[source]#

Local executor class that directly invokes the input function.

Methods:

cancel(task_metadata, job_handle)

Method to cancel the job identified uniquely by the job_handle (base class)

execute(function, args, kwargs, dispatch_id, …)

Execute the function with the given arguments.

from_dict(object_dict)

Rehydrate a dictionary representation

get_cancel_requested()

Check if the task was requested to be cancelled by the user

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Execute the function locally

set_job_handle(handle)

Save the job_id/handle returned by the backend executing the task

setup(task_metadata)

Placeholder to run any executor specific tasks

teardown(task_metadata)

Placeholder to run any executor specific cleanup/teardown actions

to_dict()

Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

cancel(task_metadata, job_handle)#

Method to cancel the job identified uniquely by the job_handle (base class)

Arg(s)

task_metadata: Metadata of the task to be cancelled job_handle: Unique ID of the job assigned by the backend

Return(s)

False by default

Return type

Literal[False]

execute(function, args, kwargs, dispatch_id, results_dir, node_id=- 1)#

Execute the function with the given arguments.

This calls the executor-specific run() method.

Parameters
  • function (Callable) – The input python function which will be executed and whose result is ultimately returned by this function.

  • args (List) – List of positional arguments to be used by the function.

  • kwargs (Dict) – Dictionary of keyword arguments to be used by the function.

  • dispatch_id (str) – The unique identifier of the external lattice process which is calling this function.

  • results_dir (str) – The location of the results directory.

  • node_id (int) – ID of the node in the transport graph which is using this executor.

Returns

The result of the function execution.

Return type

output

from_dict(object_dict)#

Rehydrate a dictionary representation

Parameters

object_dict (dict) – a dictionary representation returned by to_dict

Return type

BaseExecutor

Returns

self

Instance attributes will be overwritten.

get_cancel_requested()#

Check if the task was requested to be cancelled by the user

Arg(s)

None

Return(s)

True/False whether task cancellation was requested

Return type

bool

get_dispatch_context(dispatch_info)#

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside current context.

Return type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

run(function, args, kwargs, task_metadata)[source]#

Execute the function locally

Arg(s)

function: Function to be executed args: Arguments passed to the function kwargs: Keyword arguments passed to the function task_metadata: Metadata of the task to be executed

Return(s)

Task output

Return type

Any

set_job_handle(handle)#

Save the job_id/handle returned by the backend executing the task

Arg(s)

handle: Any JSONable type to identifying the task being executed by the backend

Return(s)

Response from saving the job handle to database

Return type

Any

setup(task_metadata)#

Placeholder to run any executor specific tasks

Return type

Any

teardown(task_metadata)#

Placeholder to run any executor specific cleanup/teardown actions

Return type

Any

to_dict()#

Return a JSON-serializable dictionary representation of self

Return type

dict

write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#

Write the contents of stdout and stderr to respective files.

Parameters
  • stream_strings (Iterable[str]) – The stream_strings to be written to files.

  • filepaths (Iterable[str]) – The filepaths to be used for writing the streams.

  • dispatch_id (str) – The ID of the dispatch which initiated the request.

  • results_dir (str) – The location of the results directory.

Return type

None


File Transfer#

File Transfer from (source) and to (destination) local or remote files prior/post electron execution. Instances are provided to files keyword argument in an electron decorator.

class covalent._file_transfer.file.File(filepath=None, is_remote=False, is_dir=False, include_folder=False)[source]#

File class to store components of provided URI including scheme (s3://, file://, ect.) determine if the file is remote, and acts a facade to facilitate filesystem operations.

filepath#

File path corresponding to the file.

is_remote#

Flag determining if file is remote (override). Default is resolved automatically from file scheme.

is_dir#

Flag determining if file is a directory (override). Default is determined if file uri contains trailing slash.

include_folder#

Flag that determines if the folder should be included in the file transfer, if False only contents of folder are transfered.

class covalent._file_transfer.folder.Folder(filepath=None, is_remote=False, is_dir=True, include_folder=False)[source]#

Folder class to store components of provided URI including scheme (s3://, file://, ect.), determine if the file is remote, and act as facade to facilitate filesystem operations. Folder is a child of the File class which sets is_dir flag to True.

include_folder#

Flag that determines if the folder should be included in the file transfer, if False only contents of folder are transfered.

class covalent._file_transfer.file_transfer.FileTransfer(from_file=None, to_file=None, order=<Order.BEFORE: 'before'>, strategy=None)[source]#

FileTransfer object class that takes two File objects or filepaths (from, to) and a File Transfer Strategy to perform remote or local file transfer operations.

from_file#

Filepath or File object corresponding to the source file.

to_file#

Filepath or File object corresponding to the destination file.

order#

Order (enum) to execute the file transfer before (Order.BEFORE) or after (Order.AFTER) electron execution.

strategy#

Optional File Transfer Strategy to perform file operations - default will be resolved from provided file schemes.

covalent._file_transfer.file_transfer.TransferFromRemote(from_filepath, to_filepath=None, strategy=None, order=<Order.BEFORE: 'before'>)[source]#

Factory for creating a FileTransfer instance where from_filepath is implicitly created as a remote File Object, and the order (Order.BEFORE) is set so that this file transfer will occur prior to electron execution.

Parameters
  • from_filepath (str) – File path corresponding to remote file (source).

  • to_filepath (Optional[str]) – File path corresponding to local file (destination)

  • strategy (Optional[FileTransferStrategy]) – Optional File Transfer Strategy to perform file operations - default will be resolved from provided file schemes.

  • order (Order) – Order (enum) to execute the file transfer before (Order.BEFORE) or after (Order.AFTER) electron execution - default is BEFORE

Return type

FileTransfer

Returns

FileTransfer instance with implicit Order.BEFORE enum set and from (source) file marked as remote

covalent._file_transfer.file_transfer.TransferToRemote(to_filepath, from_filepath=None, strategy=None, order=<Order.AFTER: 'after'>)[source]#

Factory for creating a FileTransfer instance where to_filepath is implicitly created as a remote File Object, and the order (Order.AFTER) is set so that this file transfer will occur post electron execution.

Parameters
  • to_filepath (str) – File path corresponding to remote file (destination)

  • from_filepath (Optional[str]) – File path corresponding to local file (source).

  • strategy (Optional[FileTransferStrategy]) – Optional File Transfer Strategy to perform file operations - default will be resolved from provided file schemes.

  • order (Order) – Order (enum) to execute the file transfer before (Order.BEFORE) or after (Order.AFTER) electron execution - default is AFTER

Return type

FileTransfer

Returns

FileTransfer instance with implicit Order.AFTER enum set and to (destination) file marked as remote


File Transfer Strategies#

A set of classes with a shared interface to perform copy, download, and upload operations given two (source & destination) File objects that support various protocols.

class covalent._file_transfer.strategies.transfer_strategy_base.FileTransferStrategy[source]#

Base FileTransferStrategy class that defines the interface for file transfer strategies exposing common methods for performing copy, download, and upload operations.

class covalent._file_transfer.strategies.rsync_strategy.Rsync(user='', host='', private_key_path=None)[source]#

Implements Base FileTransferStrategy class to use rsync to move files to and from remote or local filesystems. Rsync via ssh is used if one of the provided files is marked as remote.

user#

(optional) Determine user to specify for remote host if using rsync with ssh

host#

(optional) Determine what host to connect to if using rsync with ssh

private_key_path#

(optional) Filepath for ssh private key to use if using rsync with ssh


Triggers#

Execute a workflow triggered by a specific type of event

Classes:

BaseTrigger([lattice_dispatch_id, …])

Base class to be subclassed by any custom defined trigger.

DirTrigger(dir_path, event_names[, …])

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

TimeTrigger(time_gap[, lattice_dispatch_id, …])

Performs a trigger action every time_gap seconds.

class covalent.triggers.BaseTrigger(lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: object

Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.

Parameters
  • lattice_dispatch_id (Optional[str]) – Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered

  • dispatcher_addr (Optional[str]) – Address of dispatcher server used to retrieve info about or redispatch any dispatches

  • triggers_server_addr (Optional[str]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.lattice_dispatch_id#

Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered

self.dispatcher_addr#

Address of dispatcher server used to retrieve info about or redispatch any dispatches

self.triggers_server_addr#

Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.new_dispatch_ids#

List of all the newly created dispatch ids from performing redispatch

self.observe_blocks#

Boolean to indicate whether the self.observe method is a blocking call

self.event_loop#

Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs

self.use_internal_funcs#

Boolean indicating whether to use dispatcher’s functions directly instead of through API calls

self.stop_flag#

To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)

Methods:

observe()

Start observing for any change which can be used to trigger this trigger.

register()

Register this trigger to the Triggers server and start observing.

stop()

Stop observing for changes.

to_dict()

Return a dictionary representation of this trigger which can later be used to regenerate it.

trigger()

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.

abstract observe()[source]#

Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.

register()[source]#

Register this trigger to the Triggers server and start observing.

Return type

None

abstract stop()[source]#

Stop observing for changes. To be implemented by the subclass.

to_dict()[source]#

Return a dictionary representation of this trigger which can later be used to regenerate it.

Returns

Dictionary representation of this trigger

Return type

tr_dict

trigger()[source]#

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow. Should be called within self.observe() whenever a trigger action is desired.

Raises

RuntimeError – In case no dispatch id is connected to this trigger

Return type

None

class covalent.triggers.DirTrigger(dir_path, event_names, batch_size=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None, recursive=False)[source]#

Bases: covalent.triggers.base.BaseTrigger

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

Parameters
  • dir_path – Path to the file/dir which is to be observed for events

  • event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].

  • batch_size (int) – The number of changes to wait for before performing the trigger action, default is 1.

  • recursive (bool) – Whether to recursively watch the directory, default is False.

self.dir_path#

Path to the file/dir which is to be observed for events

self.event_names#

List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”]

self.batch_size#

The number of events to wait for before performing the trigger action, default is 1.

self.recursive#

Whether to recursively watch the directory, default is False.

self.n_changes#

Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.

Methods:

attach_methods_to_handler()

Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.

observe()

Start observing the file/dir for any possible events among the ones mentioned in self.event_names.

stop()

Stop observing the file or directory for changes.

attach_methods_to_handler()[source]#

Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.

Parameters

event_names – List of event names upon which to perform a trigger action

Return type

None

observe()[source]#

Start observing the file/dir for any possible events among the ones mentioned in self.event_names. Currently only supports running within the Covalent/Triggers server.

Return type

None

stop()[source]#

Stop observing the file or directory for changes.

Return type

None

class covalent.triggers.TimeTrigger(time_gap, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: covalent.triggers.base.BaseTrigger

Performs a trigger action every time_gap seconds.

Parameters

time_gap (int) – Amount of seconds to wait before doing a trigger action

self.time_gap#

Amount of seconds to wait before doing a trigger action

self.stop_flag#

Thread safe flag used to check whether the stop condition has been met

Methods:

observe()

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

stop()

Stop the running self.observe() method by setting the self.stop_flag flag.

observe()[source]#

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

Return type

None

stop()[source]#

Stop the running self.observe() method by setting the self.stop_flag flag.

Return type

None


Dask Executor#

Executing tasks (electrons) in a Dask cluster

class covalent.executor.executor_plugins.dask.DaskExecutor(scheduler_address='', log_stdout='stdout.log', log_stderr='stderr.log', conda_env='', cache_dir='', current_env_on_conda_fail=False)[source]#

Dask executor class that submits the input function to a running dask cluster.

Methods:

cancel(task_metadata, job_handle)

Cancel the task being executed by the dask executor currently

from_dict(object_dict)

Rehydrate a dictionary representation

get_cancel_requested()

Get if the task was requested to be canceled

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Submit the function and inputs to the dask cluster

set_job_handle(handle)

Save the job handle to database

setup(task_metadata)

Executor specific setup method

teardown(task_metadata)

Executor specific teardown method

to_dict()

Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

async cancel(task_metadata, job_handle)[source]#

Cancel the task being executed by the dask executor currently

Arg(s)

task_metadata: Metadata associated with the task job_handle: Key assigned to the job by Dask

Return(s)

True by default

Return type

Literal[True]

from_dict(object_dict)#

Rehydrate a dictionary representation

Parameters

object_dict (dict) – a dictionary representation returned by to_dict

Return type

BaseExecutor

Returns

self

Instance attributes will be overwritten.

async get_cancel_requested()#

Get if the task was requested to be canceled

Arg(s)

None

Return(s)

Whether the task has been requested to be cancelled

Return type

Any

get_dispatch_context(dispatch_info)#

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside current context.

Return type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

async run(function, args, kwargs, task_metadata)[source]#

Submit the function and inputs to the dask cluster

async set_job_handle(handle)#

Save the job handle to database

Arg(s)

handle: JSONable type identifying the job being executed by the backend

Return(s)

Response from the listener that handles inserting the job handle to database

Return type

Any

async setup(task_metadata)#

Executor specific setup method

async teardown(task_metadata)#

Executor specific teardown method

to_dict()#

Return a JSON-serializable dictionary representation of self

Return type

dict

async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)#

Write the contents of stdout and stderr to respective files.

Parameters
  • stream_strings (Iterable[str]) – The stream_strings to be written to files.

  • filepaths (Iterable[str]) – The filepaths to be used for writing the streams.

  • dispatch_id (str) – The ID of the dispatch which initiated the request.

  • results_dir (str) – The location of the results directory.

This uses aiofiles to avoid blocking the event loop.

Return type

None


Dependencies#

Generic dependencies for an electron

class covalent._workflow.deps.Deps(apply_fn=None, apply_args=[], apply_kwargs={}, *, retval_keyword='')[source]#

Generic dependency class used in specifying any kind of dependency for an electron.

apply_fn#

function to be executed in the backend environment

apply_args#

list of arguments to be applied in the backend environment

apply_kwargs#

dictionary of keyword arguments to be applied in the backend environment

Methods:

apply()

Encapsulates the exact function and args/kwargs to be executed in the backend environment.

apply()[source]#

Encapsulates the exact function and args/kwargs to be executed in the backend environment.

Parameters

None

Return type

Tuple[TransportableObject, TransportableObject, TransportableObject, str]

Returns

A tuple of transportable objects containing the function and optional args/kwargs


Pip Dependencies#

PyPI packages to be installed before executing an electron

class covalent._workflow.depspip.DepsPip(packages=[], reqs_path='')#

PyPI packages to be installed before executing an electron

A specification of Pip packages to be installed

packages#

A list of PyPI packages to install

reqs_path#

Path to requirements.txt (overrides packages)

These packages are installed in an electron’s execution environment just before the electron is run.

Methods:

from_dict(object_dict)

Rehydrate a dictionary representation

to_dict()

Return a JSON-serializable dictionary representation of self

from_dict(object_dict)[source]#

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return type

DepsPip

Returns

self

Instance attributes will be overwritten.

to_dict()[source]#

Return a JSON-serializable dictionary representation of self

Return type

dict


Bash Dependencies#

Shell commands to run before an electron

class covalent._workflow.depsbash.DepsBash(commands=[])#

Shell commands to run before an electron

Deps class to encapsulate Bash dependencies for an electron.

The specified commands will be executed as subprocesses in the same environment as the electron.

commands#

A list of bash commands to execute before the electron runs.

Methods:

from_dict(object_dict)

Rehydrate a dictionary representation

to_dict()

Return a JSON-serializable dictionary representation of self

from_dict(object_dict)[source]#

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return type

DepsBash

Returns

self

Instance attributes will be overwritten.

to_dict()[source]#

Return a JSON-serializable dictionary representation of self

Return type

dict


Call Dependencies#

Functions, shell commands, PyPI packages, and other types of dependencies to be called in an electron’s execution environment

class covalent._workflow.depscall.DepsCall(func=None, args=[], kwargs={}, *, retval_keyword='', override_reserved_retval_keys=False)#

Functions, shell commands, PyPI packages, and other types of dependencies to be called in an electron’s execution environment

Deps class to encapsulate python functions to be called in the same execution environment as the electron.

func#

A callable

args#

args list

kwargs#

kwargs dict

retval_keyword#

An optional string referencing the return value of func.

If retval_keyword is specified, the return value of func will be passed during workflow execution as an argument to the electron corresponding to the parameter of the same name.

Notes

Electron parameters to be injected during execution must have default parameter values.

It is the user’s responsibility to ensure that retval_keyword is actually a parameter of the electron. Unexpected behavior may occur otherwise.

Methods:

from_dict(object_dict)

Rehydrate a dictionary representation

to_dict()

Return a JSON-serializable dictionary representation of self

from_dict(object_dict)[source]#

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return type

DepsCall

Returns

self

Instance attributes will be overwritten.

to_dict()[source]#

Return a JSON-serializable dictionary representation of self

Return type

dict


Dispatcher#

Dispatching jobs to the server and stopping triggered dispatches


Result#

Collecting and managing results

class covalent._results_manager.result.Result(lattice, dispatch_id='')[source]#

Result class to store and perform operations on the result obtained from a dispatch.

lattice#

“Lattice” object which was dispatched.

results_dir#

Directory where the result will be stored. It’ll be in the format of “<results_dir>/<dispatch_id>/”.

dispatch_id#

Dispatch id assigned to this dispatch.

root_dispatch_id#

Dispatch id of the root lattice in a hierarchy of sublattice workflows.

status#

Status of the result. It’ll be one of the following: - Result.NEW_OBJ: When it is a new result object. - Result.COMPLETED: When processing of all the nodes has completed successfully. - Result.RUNNING: When some node executions are in process. - Result.FAILED: When one or more node executions have failed. - Result.CANCELLED: When the dispatch was cancelled.

result#

Final result of the dispatch, i.e whatever the “Lattice” was returning as a function.

inputs#

Inputs sent to the “Lattice” function for dispatching.

error#

Error due to which the execution failed.

Functions:

save_result: Save the result object to the passed results directory or to self.results_dir by default. get_all_node_outputs: Return all the outputs of all the node executions.

Attributes:

dispatch_id

Dispatch id of current dispatch.

encoded_result

Encoded final result of current dispatch

end_time

End time of processing the dispatch.

error

Error due to which the dispatch failed.

inputs

Inputs sent to the “Lattice” function for dispatching.

lattice

“Lattice” object which was dispatched.

result

Final result of current dispatch.

results_dir

Results directory used to save this result object.

root_dispatch_id

Dispatch id of the root dispatch

start_time

Start time of processing the dispatch.

status

Status of current dispatch.

Methods:

get_all_node_outputs()

Return output of every node execution.

get_all_node_results()

Get all the node results.

get_node_result(node_id)

Return the result of a particular node.

post_process()

Post-processing method.

property dispatch_id: str#

Dispatch id of current dispatch.

Return type

str

property encoded_result: covalent.TransportableObject#

Encoded final result of current dispatch

Return type

TransportableObject

property end_time: datetime.datetime#

End time of processing the dispatch.

Return type

datetime

property error: str#

Error due to which the dispatch failed.

Return type

str

get_all_node_outputs()[source]#

Return output of every node execution.

Parameters

None

Returns

A dictionary containing the output of every node execution.

Return type

node_outputs

get_all_node_results()[source]#

Get all the node results.

Parameters

None

Returns

A list of dictionaries containing the result of every node execution.

Return type

node_results

get_node_result(node_id)[source]#

Return the result of a particular node.

Parameters

node_id (int) – The node id.

Returns

The result of the node containing below in a dictionary format:
  • node_id: The node id.

  • node_name: The name of the node.

  • start_time: The start time of the node execution.

  • end_time: The end time of the node execution.

  • status: The status of the node execution.

  • output: The output of the node unless error occurred in which case None.

  • error: The error of the node if occurred else None.

  • sublattice_result: The result of the sublattice if any.

  • stdout: The stdout of the node execution.

  • stderr: The stderr of the node execution.

Return type

node_result

property inputs: dict#

Inputs sent to the “Lattice” function for dispatching.

Return type

dict

property lattice: covalent._workflow.lattice.Lattice#

“Lattice” object which was dispatched.

Return type

Lattice

post_process()[source]#

Post-processing method. This method was introduced to enable manual client-side postprocessing in case automatic post-processing by the server fails (e.g. insufficient dask worker memory)

Returns

Post-processed result output

Return type

Any

property result: Union[int, float, list, dict]#

Final result of current dispatch.

Return type

Union[int, float, list, dict]

property results_dir: str#

Results directory used to save this result object.

Return type

str

property root_dispatch_id: str#

Dispatch id of the root dispatch

Return type

str

property start_time: datetime.datetime#

Start time of processing the dispatch.

Return type

datetime

property status: covalent._shared_files.util_classes.Status#

Status of current dispatch.

Return type

Status


Covalent CLI Tool#

This Command Line Interface (CLI) tool is used to manage Covalent server.

covalent#

Covalent CLI tool used to manage the servers.

covalent [OPTIONS] COMMAND [ARGS]...

Options

-v, --version#

Display version information.

start#

Start the Covalent server.

covalent start [OPTIONS]

Options

-d, --develop#

Start the server in developer mode.

-p, --port <port>#

Server port number.

Default

48008

-m, --mem-per-worker <mem_per_worker>#

Memory limit per worker in (GB). Provide strings like 1gb/1GB or 0 for no limits

-n, --workers <workers>#

Number of workers to start covalent with.

-t, --threads-per-worker <threads_per_worker>#

Number of CPU threads per worker.

--ignore-migrations#

Start the server without requiring migrations

Default

False

--no-cluster#

Start the server without Dask

Default

False

--no-triggers#

Start Covalent server without the Triggers server

Default

False

--triggers-only#

Start only the Triggers server

Default

False

stop#

Stop the Covalent server.

covalent stop [OPTIONS]

restart#

Restart the server.

covalent restart [OPTIONS]

Options

-p, --port <port>#

Restart Covalent server on a different port.

-d, --develop#

Start the server in developer mode.

status#

Query the status of the Covalent server.

covalent status [OPTIONS]

purge#

Purge Covalent from this system. This command is for developers.

covalent purge [OPTIONS]

Options

-H, --hard#

Perform a hard purge, deleting the DB as well. [default: False]

-y, --yes#

Approve without showing the warning. [default: False]

logs#

Show Covalent server logs.

covalent logs [OPTIONS]

db#

Group of utility commands to manage dispatcher database

covalent db [OPTIONS] COMMAND [ARGS]...
alembic#

Expose alembic CLI to be used via covalent CLI

covalent db alembic [OPTIONS] [ALEMBIC_ARGS]...

Arguments

ALEMBIC_ARGS#

Optional argument(s)

migrate#

Run DB Migrations programatically

covalent db migrate [OPTIONS]

migrate-legacy-result-object#

Migrate a legacy pickled Result object to the DataStore

Example: covalent migrate-legacy-result-object result.pkl

covalent migrate-legacy-result-object [OPTIONS] RESULT_PICKLE_PATH

Arguments

RESULT_PICKLE_PATH#

Required argument

cluster#

Inspect and manage the Dask cluster’s configuration.

covalent cluster [OPTIONS]

Options

--status#

Show Dask cluster status

--info#

Retrieve Dask cluster info

--address#

Fetch connection information of the cluster scheduler/workers

--size#

Return number of active workers in the cluster

--restart#

Restart the cluster

--scale <scale>#

Scale cluster by adding/removing workers to match nworkers

Default

2

--logs#

Show Dask cluster logs