Triggers#
Execute a workflow triggered by a specific type of event
Classes:
|
Base class to be subclassed by any custom defined trigger. |
|
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen. |
|
Performs a trigger action every time_gap seconds. |
- class covalent.triggers.BaseTrigger(lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
object
Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.
- Parameters
lattice_dispatch_id (
Optional
[str
]) – Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggereddispatcher_addr (
Optional
[str
]) – Address of dispatcher server used to retrieve info about or redispatch any dispatchestriggers_server_addr (
Optional
[str
]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
- self.lattice_dispatch_id#
Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered
- self.dispatcher_addr#
Address of dispatcher server used to retrieve info about or redispatch any dispatches
- self.triggers_server_addr#
Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
- self.new_dispatch_ids#
List of all the newly created dispatch ids from performing redispatch
- self.observe_blocks#
Boolean to indicate whether the self.observe method is a blocking call
- self.event_loop#
Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs
- self.use_internal_funcs#
Boolean indicating whether to use dispatcher’s functions directly instead of through API calls
- self.stop_flag#
To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)
Methods:
observe
()Start observing for any change which can be used to trigger this trigger.
register
()Register this trigger to the Triggers server and start observing.
stop
()Stop observing for changes.
to_dict
()Return a dictionary representation of this trigger which can later be used to regenerate it.
trigger
()Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.
- abstract observe()[source]#
Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.
- register()[source]#
Register this trigger to the Triggers server and start observing.
- Return type
None
- class covalent.triggers.DirTrigger(dir_path, event_names, batch_size=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None, recursive=False)[source]#
Bases:
covalent.triggers.base.BaseTrigger
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.
- Parameters
dir_path – Path to the file/dir which is to be observed for events
event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].
batch_size (
int
) – The number of changes to wait for before performing the trigger action, default is 1.recursive (
bool
) – Whether to recursively watch the directory, default is False.
- self.dir_path#
Path to the file/dir which is to be observed for events
- self.event_names#
List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”]
- self.batch_size#
The number of events to wait for before performing the trigger action, default is 1.
- self.recursive#
Whether to recursively watch the directory, default is False.
- self.n_changes#
Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.
Methods:
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
observe
()Start observing the file/dir for any possible events among the ones mentioned in self.event_names.
stop
()Stop observing the file or directory for changes.
- attach_methods_to_handler()[source]#
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
- Parameters
event_names – List of event names upon which to perform a trigger action
- Return type
None
- class covalent.triggers.TimeTrigger(time_gap, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
covalent.triggers.base.BaseTrigger
Performs a trigger action every time_gap seconds.
- Parameters
time_gap (
int
) – Amount of seconds to wait before doing a trigger action
- self.time_gap#
Amount of seconds to wait before doing a trigger action
- self.stop_flag#
Thread safe flag used to check whether the stop condition has been met
Methods:
observe
()Keep performing the trigger action every self.time_gap seconds until stop condition has been met.
stop
()Stop the running self.observe() method by setting the self.stop_flag flag.