Triggers#

Execute a workflow triggered by a specific type of event

Classes:

BaseTrigger([lattice_dispatch_id, …])

Base class to be subclassed by any custom defined trigger.

DirTrigger(dir_path, event_names[, …])

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

TimeTrigger(time_gap[, lattice_dispatch_id, …])

Performs a trigger action every time_gap seconds.

class covalent.triggers.BaseTrigger(lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: object

Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.

Parameters
  • lattice_dispatch_id (Optional[str]) – Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered

  • dispatcher_addr (Optional[str]) – Address of dispatcher server used to retrieve info about or redispatch any dispatches

  • triggers_server_addr (Optional[str]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.lattice_dispatch_id#

Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered

self.dispatcher_addr#

Address of dispatcher server used to retrieve info about or redispatch any dispatches

self.triggers_server_addr#

Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.new_dispatch_ids#

List of all the newly created dispatch ids from performing redispatch

self.observe_blocks#

Boolean to indicate whether the self.observe method is a blocking call

self.event_loop#

Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs

self.use_internal_funcs#

Boolean indicating whether to use dispatcher’s functions directly instead of through API calls

self.stop_flag#

To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)

Methods:

observe()

Start observing for any change which can be used to trigger this trigger.

register()

Register this trigger to the Triggers server and start observing.

stop()

Stop observing for changes.

to_dict()

Return a dictionary representation of this trigger which can later be used to regenerate it.

trigger()

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.

abstract observe()[source]#

Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.

register()[source]#

Register this trigger to the Triggers server and start observing.

Return type

None

abstract stop()[source]#

Stop observing for changes. To be implemented by the subclass.

to_dict()[source]#

Return a dictionary representation of this trigger which can later be used to regenerate it.

Returns

Dictionary representation of this trigger

Return type

tr_dict

trigger()[source]#

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow. Should be called within self.observe() whenever a trigger action is desired.

Raises

RuntimeError – In case no dispatch id is connected to this trigger

Return type

None

class covalent.triggers.DirTrigger(dir_path, event_names, batch_size=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None, recursive=False)[source]#

Bases: covalent.triggers.base.BaseTrigger

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

Parameters
  • dir_path – Path to the file/dir which is to be observed for events

  • event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].

  • batch_size (int) – The number of changes to wait for before performing the trigger action, default is 1.

  • recursive (bool) – Whether to recursively watch the directory, default is False.

self.dir_path#

Path to the file/dir which is to be observed for events

self.event_names#

List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”]

self.batch_size#

The number of events to wait for before performing the trigger action, default is 1.

self.recursive#

Whether to recursively watch the directory, default is False.

self.n_changes#

Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.

Methods:

attach_methods_to_handler()

Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.

observe()

Start observing the file/dir for any possible events among the ones mentioned in self.event_names.

stop()

Stop observing the file or directory for changes.

attach_methods_to_handler()[source]#

Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.

Parameters

event_names – List of event names upon which to perform a trigger action

Return type

None

observe()[source]#

Start observing the file/dir for any possible events among the ones mentioned in self.event_names. Currently only supports running within the Covalent/Triggers server.

Return type

None

stop()[source]#

Stop observing the file or directory for changes.

Return type

None

class covalent.triggers.TimeTrigger(time_gap, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#

Bases: covalent.triggers.base.BaseTrigger

Performs a trigger action every time_gap seconds.

Parameters

time_gap (int) – Amount of seconds to wait before doing a trigger action

self.time_gap#

Amount of seconds to wait before doing a trigger action

self.stop_flag#

Thread safe flag used to check whether the stop condition has been met

Methods:

observe()

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

stop()

Stop the running self.observe() method by setting the self.stop_flag flag.

observe()[source]#

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

Return type

None

stop()[source]#

Stop the running self.observe() method by setting the self.stop_flag flag.

Return type

None