Dispatch Infrastructure#

Dispatcher#

Dispatching jobs to the server and stopping triggered dispatches

Examples#


Triggers#

Execute a workflow triggered by a specific type of event

Classes:

BaseTrigger([lattice_dispatch_id, …])

Base class to be subclassed by any custom defined trigger.

DatabaseTrigger(db_path, table_name[, …])

Database trigger which can read for changes in a database and trigger workflows based on record changes.

DirTrigger(dir_path, event_names[, …])

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

SQLiteTrigger(db_path, table_name[, …])

SQLite based Trigger which can read for changes in a SQLite database and trigger workflows based on that.

TimeTrigger(time_gap[, lattice_dispatch_id, …])

Performs a trigger action every time_gap seconds.

class covalent.triggers.BaseTrigger(lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: object

Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.

Parameters
  • lattice_dispatch_id (Optional[str]) – Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered

  • dispatcher_addr (Optional[str]) – Address of dispatcher server used to retrieve info about or redispatch any dispatches

  • triggers_server_addr (Optional[str]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.lattice_dispatch_id#

Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered

self.dispatcher_addr#

Address of dispatcher server used to retrieve info about or redispatch any dispatches

self.triggers_server_addr#

Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.new_dispatch_ids#

List of all the newly created dispatch ids from performing redispatch

self.observe_blocks#

Boolean to indicate whether the self.observe method is a blocking call

self.event_loop#

Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs

self.use_internal_funcs#

Boolean indicating whether to use dispatcher’s functions directly instead of through API calls

self.stop_flag#

To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)

Methods:

observe()

Start observing for any change which can be used to trigger this trigger.

register()

Register this trigger to the Triggers server and start observing.

stop()

Stop observing for changes.

to_dict()

Return a dictionary representation of this trigger which can later be used to regenerate it.

trigger()

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.

abstract observe()[source]#

Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.

register()[source]#

Register this trigger to the Triggers server and start observing.

Return type

None

abstract stop()[source]#

Stop observing for changes. To be implemented by the subclass.

to_dict()[source]#

Return a dictionary representation of this trigger which can later be used to regenerate it.

Returns

Dictionary representation of this trigger

Return type

tr_dict

trigger()[source]#

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow. Should be called within self.observe() whenever a trigger action is desired.

Raises

RuntimeError – In case no dispatch id is connected to this trigger

Return type

None

class covalent.triggers.DatabaseTrigger(db_path, table_name, poll_interval=1, where_clauses=None, trigger_after_n=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: covalent.triggers.base.BaseTrigger

Database trigger which can read for changes in a database and trigger workflows based on record changes.

Parameters
  • db_path (str) – Connection string for the database

  • table_name (str) – Name of the table to observe

  • poll_interval (int) – Time in seconds to wait for before reading the database again

  • where_clauses (Optional[List[str]]) – List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the database

  • trigger_after_n (int) – Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.

  • lattice_dispatch_id (Optional[str]) – Lattice dispatch id of the workflow to be triggered

  • dispatcher_addr (Optional[str]) – Address of the dispatcher server

  • triggers_server_addr (Optional[str]) – Address of the triggers server

self.db_path#

Connection string for the database

self.table_name#

Name of the table to observe

self.poll_interval#

Time in seconds to wait for before reading the database again

self.where_clauses#

List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the database

self.trigger_after_n#

Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.

self.stop_flag#

Thread safe flag used to check whether the stop condition has been met

Methods:

observe()

Keep performing the trigger action as long as where conditions are met or until stop has being called

stop()

Stop the running self.observe() method by setting the self.stop_flag flag.

observe()[source]#

Keep performing the trigger action as long as where conditions are met or until stop has being called

Return type

None

stop()[source]#

Stop the running self.observe() method by setting the self.stop_flag flag.

Return type

None

class covalent.triggers.DirTrigger(dir_path, event_names, batch_size=1, recursive=False, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: covalent.triggers.base.BaseTrigger

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

Parameters
  • dir_path – Path to the file/dir which is to be observed for events

  • event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].

  • batch_size (int) – The number of changes to wait for before performing the trigger action, default is 1.

  • recursive (bool) – Whether to recursively watch the directory, default is False.

self.dir_path#

Path to the file/dir which is to be observed for events

self.event_names#

List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”]

self.batch_size#

The number of events to wait for before performing the trigger action, default is 1.

self.recursive#

Whether to recursively watch the directory, default is False.

self.n_changes#

Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.

Methods:

attach_methods_to_handler()

Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.

observe()

Start observing the file/dir for any possible events among the ones mentioned in self.event_names.

stop()

Stop observing the file or directory for changes.

attach_methods_to_handler()[source]#

Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.

Parameters

event_names – List of event names upon which to perform a trigger action

Return type

None

observe()[source]#

Start observing the file/dir for any possible events among the ones mentioned in self.event_names. Currently only supports running within the Covalent/Triggers server.

Return type

None

stop()[source]#

Stop observing the file or directory for changes.

Return type

None

class covalent.triggers.SQLiteTrigger(db_path, table_name, poll_interval=1, where_clauses=None, trigger_after_n=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: covalent.triggers.base.BaseTrigger

SQLite based Trigger which can read for changes in a SQLite database and trigger workflows based on that.

Parameters
  • db_path (str) – Absolute path to the database file

  • table_name (str) – Name of the table to observe

  • poll_interval (int) – Time in seconds to wait for before reading the database again

  • where_clauses (Optional[List[str]]) – List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the database

  • trigger_after_n (int) – Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.

  • lattice_dispatch_id (Optional[str]) – Lattice dispatch id of the workflow to be triggered

  • dispatcher_addr (Optional[str]) – Address of the dispatcher server

  • triggers_server_addr (Optional[str]) – Address of the triggers server

self.db_path#

Absolute path to the database file

self.table_name#

Name of the table to observe

self.poll_interval#

Time in seconds to wait for before reading the database again

self.where_clauses#

List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the database

self.trigger_after_n#

Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.

self.stop_flag#

Thread safe flag used to check whether the stop condition has been met

Methods:

observe()

Keep performing the trigger action as long as where conditions are met or until stop has being called

stop()

Stop the running self.observe() method by setting the self.stop_flag flag.

observe()[source]#

Keep performing the trigger action as long as where conditions are met or until stop has being called

Return type

None

stop()[source]#

Stop the running self.observe() method by setting the self.stop_flag flag.

Return type

None

class covalent.triggers.TimeTrigger(time_gap, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: covalent.triggers.base.BaseTrigger

Performs a trigger action every time_gap seconds.

Parameters

time_gap (int) – Amount of seconds to wait before doing a trigger action

self.time_gap#

Amount of seconds to wait before doing a trigger action

self.stop_flag#

Thread safe flag used to check whether the stop condition has been met

Methods:

observe()

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

stop()

Stop the running self.observe() method by setting the self.stop_flag flag.

observe()[source]#

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

Return type

None

stop()[source]#

Stop the running self.observe() method by setting the self.stop_flag flag.

Return type

None

Examples#


Cancellation#

Cancel a dispatch using the dispatch_id or multiple tasks within a workflow using the task_ids

covalent._results_manager.results_manager.cancel(dispatch_id, task_ids=None, dispatcher_addr=None)[source]#

Cancel a running dispatch.

Parameters
  • dispatch_id (str) – The dispatch id of the dispatch to be cancelled.

  • task_ids (Optional[List[int]]) – Optional, list of task ids to cancel within the workflow

  • dispatcher_addr (Optional[str]) – Dispatcher server address, if None then defaults to the address set in Covalent’s config.

Return type

str

Returns

Cancellation response

Examples#


Results#

Collecting and managing results

class covalent._results_manager.result.Result(lattice, dispatch_id='')[source]#

Result class to store and perform operations on the result obtained from a dispatch.

lattice#

“Lattice” object which was dispatched.

results_dir#

Directory where the result will be stored. It’ll be in the format of “<results_dir>/<dispatch_id>/”.

dispatch_id#

Dispatch id assigned to this dispatch.

root_dispatch_id#

Dispatch id of the root lattice in a hierarchy of sublattice workflows.

status#

Status of the result. It’ll be one of the following: - Result.NEW_OBJ: When it is a new result object. - Result.COMPLETED: When processing of all the nodes has completed successfully. - Result.RUNNING: When some node executions are in process. - Result.FAILED: When one or more node executions have failed. - Result.CANCELLED: When the dispatch was cancelled.

result#

Final result of the dispatch, i.e whatever the “Lattice” was returning as a function.

inputs#

Inputs sent to the “Lattice” function for dispatching.

error#

Error due to which the execution failed.

Functions:

save_result: Save the result object to the passed results directory or to self.results_dir by default. get_all_node_outputs: Return all the outputs of all the node executions.

Attributes:

dispatch_id

Dispatch id of current dispatch.

encoded_result

Encoded final result of current dispatch

end_time

End time of processing the dispatch.

error

Error due to which the dispatch failed.

inputs

Inputs sent to the “Lattice” function for dispatching.

lattice

“Lattice” object which was dispatched.

result

Final result of current dispatch.

results_dir

Results directory used to save this result object.

root_dispatch_id

Dispatch id of the root dispatch

start_time

Start time of processing the dispatch.

status

Status of current dispatch.

Methods:

get_all_node_outputs()

Return output of every node execution.

get_all_node_results()

Get all the node results.

get_node_result(node_id)

Return the result of a particular node.

post_process()

Post-processing method.

property dispatch_id: str#

Dispatch id of current dispatch.

Return type

str

property encoded_result: covalent._workflow.transportable_object.TransportableObject#

Encoded final result of current dispatch

Return type

TransportableObject

property end_time: datetime.datetime#

End time of processing the dispatch.

Return type

datetime

property error: str#

Error due to which the dispatch failed.

Return type

str

get_all_node_outputs()[source]#

Return output of every node execution.

Parameters

None

Returns

A dictionary containing the output of every node execution.

Return type

node_outputs

get_all_node_results()[source]#

Get all the node results.

Parameters

None

Returns

A list of dictionaries containing the result of every node execution.

Return type

node_results

get_node_result(node_id)[source]#

Return the result of a particular node.

Parameters

node_id (int) – The node id.

Returns

The result of the node containing below in a dictionary format:
  • node_id: The node id.

  • node_name: The name of the node.

  • start_time: The start time of the node execution.

  • end_time: The end time of the node execution.

  • status: The status of the node execution.

  • output: The output of the node unless error occurred in which case None.

  • error: The error of the node if occurred else None.

  • sublattice_result: The result of the sublattice if any.

  • stdout: The stdout of the node execution.

  • stderr: The stderr of the node execution.

Return type

node_result

property inputs: dict#

Inputs sent to the “Lattice” function for dispatching.

Return type

dict

property lattice: covalent._workflow.lattice.Lattice#

“Lattice” object which was dispatched.

Return type

Lattice

post_process()[source]#

Post-processing method. This method was introduced to enable manual client-side postprocessing in case automatic post-processing by the server fails (e.g. insufficient dask worker memory)

Returns

Post-processed result output

Return type

Any

property result: Union[int, float, list, dict]#

Final result of current dispatch.

Return type

Union[int, float, list, dict]

property results_dir: str#

Results directory used to save this result object.

Return type

str

property root_dispatch_id: str#

Dispatch id of the root dispatch

Return type

str

property start_time: datetime.datetime#

Start time of processing the dispatch.

Return type

datetime

property status: covalent._shared_files.util_classes.Status#

Status of current dispatch.

Return type

Status

Examples#