Dispatch Infrastructure#
Dispatcher#
Dispatching jobs to the server and stopping triggered dispatches
Examples#
Triggers#
Execute a workflow triggered by a specific type of event
Classes:
|
Base class to be subclassed by any custom defined trigger. |
|
Database trigger which can read for changes in a database and trigger workflows based on record changes. |
|
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen. |
|
SQLite based Trigger which can read for changes in a SQLite database and trigger workflows based on that. |
|
Performs a trigger action every time_gap seconds. |
- class covalent.triggers.BaseTrigger(lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
object
Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.
- Parameters
lattice_dispatch_id (
Optional
[str
]) – Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggereddispatcher_addr (
Optional
[str
]) – Address of dispatcher server used to retrieve info about or redispatch any dispatchestriggers_server_addr (
Optional
[str
]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
- self.lattice_dispatch_id#
Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered
- self.dispatcher_addr#
Address of dispatcher server used to retrieve info about or redispatch any dispatches
- self.triggers_server_addr#
Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
- self.new_dispatch_ids#
List of all the newly created dispatch ids from performing redispatch
- self.observe_blocks#
Boolean to indicate whether the self.observe method is a blocking call
- self.event_loop#
Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs
- self.use_internal_funcs#
Boolean indicating whether to use dispatcher’s functions directly instead of through API calls
- self.stop_flag#
To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)
Methods:
observe
()Start observing for any change which can be used to trigger this trigger.
register
()Register this trigger to the Triggers server and start observing.
stop
()Stop observing for changes.
to_dict
()Return a dictionary representation of this trigger which can later be used to regenerate it.
trigger
()Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.
- abstract observe()[source]#
Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.
- register()[source]#
Register this trigger to the Triggers server and start observing.
- Return type
None
- class covalent.triggers.DatabaseTrigger(db_path, table_name, poll_interval=1, where_clauses=None, trigger_after_n=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
covalent.triggers.base.BaseTrigger
Database trigger which can read for changes in a database and trigger workflows based on record changes.
- Parameters
db_path (
str
) – Connection string for the databasetable_name (
str
) – Name of the table to observepoll_interval (
int
) – Time in seconds to wait for before reading the database againwhere_clauses (
Optional
[List
[str
]]) – List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the databasetrigger_after_n (
int
) – Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.lattice_dispatch_id (
Optional
[str
]) – Lattice dispatch id of the workflow to be triggereddispatcher_addr (
Optional
[str
]) – Address of the dispatcher servertriggers_server_addr (
Optional
[str
]) – Address of the triggers server
- self.db_path#
Connection string for the database
- self.table_name#
Name of the table to observe
- self.poll_interval#
Time in seconds to wait for before reading the database again
- self.where_clauses#
List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the database
- self.trigger_after_n#
Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.
- self.stop_flag#
Thread safe flag used to check whether the stop condition has been met
Methods:
observe
()Keep performing the trigger action as long as where conditions are met or until stop has being called
stop
()Stop the running self.observe() method by setting the self.stop_flag flag.
- class covalent.triggers.DirTrigger(dir_path, event_names, batch_size=1, recursive=False, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
covalent.triggers.base.BaseTrigger
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.
- Parameters
dir_path – Path to the file/dir which is to be observed for events
event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].
batch_size (
int
) – The number of changes to wait for before performing the trigger action, default is 1.recursive (
bool
) – Whether to recursively watch the directory, default is False.
- self.dir_path#
Path to the file/dir which is to be observed for events
- self.event_names#
List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”]
- self.batch_size#
The number of events to wait for before performing the trigger action, default is 1.
- self.recursive#
Whether to recursively watch the directory, default is False.
- self.n_changes#
Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.
Methods:
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
observe
()Start observing the file/dir for any possible events among the ones mentioned in self.event_names.
stop
()Stop observing the file or directory for changes.
- attach_methods_to_handler()[source]#
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
- Parameters
event_names – List of event names upon which to perform a trigger action
- Return type
None
- class covalent.triggers.SQLiteTrigger(db_path, table_name, poll_interval=1, where_clauses=None, trigger_after_n=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
covalent.triggers.base.BaseTrigger
SQLite based Trigger which can read for changes in a SQLite database and trigger workflows based on that.
- Parameters
db_path (
str
) – Absolute path to the database filetable_name (
str
) – Name of the table to observepoll_interval (
int
) – Time in seconds to wait for before reading the database againwhere_clauses (
Optional
[List
[str
]]) – List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the databasetrigger_after_n (
int
) – Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.lattice_dispatch_id (
Optional
[str
]) – Lattice dispatch id of the workflow to be triggereddispatcher_addr (
Optional
[str
]) – Address of the dispatcher servertriggers_server_addr (
Optional
[str
]) – Address of the triggers server
- self.db_path#
Absolute path to the database file
- self.table_name#
Name of the table to observe
- self.poll_interval#
Time in seconds to wait for before reading the database again
- self.where_clauses#
List of “WHERE” conditions, e.g. [“id > 2”, “status = pending”], to check when polling the database
- self.trigger_after_n#
Number of times the event must happen after which the workflow will be triggered. e.g value of 2 means workflow will be triggered once the event has occurred twice.
- self.stop_flag#
Thread safe flag used to check whether the stop condition has been met
Methods:
observe
()Keep performing the trigger action as long as where conditions are met or until stop has being called
stop
()Stop the running self.observe() method by setting the self.stop_flag flag.
- class covalent.triggers.TimeTrigger(time_gap, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
covalent.triggers.base.BaseTrigger
Performs a trigger action every time_gap seconds.
- Parameters
time_gap (
int
) – Amount of seconds to wait before doing a trigger action
- self.time_gap#
Amount of seconds to wait before doing a trigger action
- self.stop_flag#
Thread safe flag used to check whether the stop condition has been met
Methods:
observe
()Keep performing the trigger action every self.time_gap seconds until stop condition has been met.
stop
()Stop the running self.observe() method by setting the self.stop_flag flag.
Examples#
Cancellation#
Cancel a dispatch using the dispatch_id
or multiple tasks within a workflow using the task_ids
- covalent._results_manager.results_manager.cancel(dispatch_id, task_ids=None, dispatcher_addr=None)[source]#
Cancel a running dispatch.
- Parameters
dispatch_id (
str
) – The dispatch id of the dispatch to be cancelled.task_ids (
Optional
[List
[int
]]) – Optional, list of task ids to cancel within the workflowdispatcher_addr (
Optional
[str
]) – Dispatcher server address, if None then defaults to the address set in Covalent’s config.
- Return type
str
- Returns
Cancellation response
Examples#
Results#
Collecting and managing results
- class covalent._results_manager.result.Result(lattice, dispatch_id='')[source]#
Result class to store and perform operations on the result obtained from a dispatch.
- lattice#
“Lattice” object which was dispatched.
- results_dir#
Directory where the result will be stored. It’ll be in the format of “<results_dir>/<dispatch_id>/”.
- dispatch_id#
Dispatch id assigned to this dispatch.
- root_dispatch_id#
Dispatch id of the root lattice in a hierarchy of sublattice workflows.
- status#
Status of the result. It’ll be one of the following: - Result.NEW_OBJ: When it is a new result object. - Result.COMPLETED: When processing of all the nodes has completed successfully. - Result.RUNNING: When some node executions are in process. - Result.FAILED: When one or more node executions have failed. - Result.CANCELLED: When the dispatch was cancelled.
- result#
Final result of the dispatch, i.e whatever the “Lattice” was returning as a function.
- inputs#
Inputs sent to the “Lattice” function for dispatching.
- error#
Error due to which the execution failed.
- Functions:
save_result: Save the result object to the passed results directory or to self.results_dir by default. get_all_node_outputs: Return all the outputs of all the node executions.
Attributes:
Dispatch id of current dispatch.
Encoded final result of current dispatch
End time of processing the dispatch.
Error due to which the dispatch failed.
Inputs sent to the “Lattice” function for dispatching.
“Lattice” object which was dispatched.
Final result of current dispatch.
Results directory used to save this result object.
Dispatch id of the root dispatch
Start time of processing the dispatch.
Status of current dispatch.
Methods:
Return output of every node execution.
Get all the node results.
get_node_result
(node_id)Return the result of a particular node.
Post-processing method.
- property dispatch_id: str#
Dispatch id of current dispatch.
- Return type
str
- property encoded_result: covalent._workflow.transportable_object.TransportableObject#
Encoded final result of current dispatch
- Return type
TransportableObject
- property end_time: datetime.datetime#
End time of processing the dispatch.
- Return type
datetime
- property error: str#
Error due to which the dispatch failed.
- Return type
str
- get_all_node_outputs()[source]#
Return output of every node execution.
- Parameters
None –
- Returns
A dictionary containing the output of every node execution.
- Return type
node_outputs
- get_all_node_results()[source]#
Get all the node results.
- Parameters
None –
- Returns
A list of dictionaries containing the result of every node execution.
- Return type
node_results
- get_node_result(node_id)[source]#
Return the result of a particular node.
- Parameters
node_id (
int
) – The node id.- Returns
- The result of the node containing below in a dictionary format:
node_id: The node id.
node_name: The name of the node.
start_time: The start time of the node execution.
end_time: The end time of the node execution.
status: The status of the node execution.
output: The output of the node unless error occurred in which case None.
error: The error of the node if occurred else None.
sublattice_result: The result of the sublattice if any.
stdout: The stdout of the node execution.
stderr: The stderr of the node execution.
- Return type
node_result
- property inputs: dict#
Inputs sent to the “Lattice” function for dispatching.
- Return type
dict
- property lattice: covalent._workflow.lattice.Lattice#
“Lattice” object which was dispatched.
- Return type
- post_process()[source]#
Post-processing method. This method was introduced to enable manual client-side postprocessing in case automatic post-processing by the server fails (e.g. insufficient dask worker memory)
- Returns
Post-processed result output
- Return type
Any
- property result: Union[int, float, list, dict]#
Final result of current dispatch.
- Return type
Union
[int
,float
,list
,dict
]
- property results_dir: str#
Results directory used to save this result object.
- Return type
str
- property root_dispatch_id: str#
Dispatch id of the root dispatch
- Return type
str
- property start_time: datetime.datetime#
Start time of processing the dispatch.
- Return type
datetime
- property status: covalent._shared_files.util_classes.Status#
Status of current dispatch.
- Return type
Status