Adding a Time Trigger to a Lattice#
This example illustrates how to use a covalent.trigger.TimeTrigger to trigger workflow dispatches automatically at a specified interval.
Import Covalent and the trigger.
import covalent as ct from covalent.triggers import TimeTrigger
TimeTriggerobject that performs a trigger action every 5 seconds.
time_trigger = TimeTrigger(time_gap=5)
Create a workflow:
@ct.lattice @ct.electron def my_workflow(): return 42
my_workflow, disabling its first execution using the
dispatch_id = ct.dispatch(my_workflow)() print(dispatch_id)
Attach the trigger to the
dispatch_idand register it with the trigger server.
time_trigger.lattice_dispatch_id = dispatch_id time_trigger.register()
Monitor the Covalent UI. Watch the Dashboard for new dispatches of
In the Covalent UI, observe that a new
my_workflowis dispatched every five seconds.
To disable triggers on the dispatch, use the
[2023-09-25 08:51:25,893] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing: [2023-09-25 08:51:25,894] [DEBUG] local.py: Line 336 in stop_triggers: 44f56fcf-96dc-4089-84d4-069fd13e3e58
Note that the
stop_triggers function disables all triggers attached to the specified dispatch.