Adding a Time Trigger to a Lattice#

This example illustrates how to use a covalent.trigger.TimeTrigger to trigger workflow dispatches automatically at a specified interval.


Import Covalent and the trigger.

import covalent as ct
from covalent.triggers import TimeTrigger


  1. Create a TimeTrigger object that performs a trigger action every 5 seconds.

time_trigger = TimeTrigger(time_gap=5)
  1. Create a workflow:

def my_workflow():
    return 42
  1. Dispatch my_workflow, disabling its first execution using the disable_run parameter in ct.dispatch.

dispatch_id = ct.dispatch(my_workflow)()
  1. Attach the trigger to the dispatch_id and register it with the trigger server.

time_trigger.lattice_dispatch_id = dispatch_id
  1. Monitor the Covalent UI. Watch the Dashboard for new dispatches of my_workflow.

  2. In the Covalent UI, observe that a new my_workflow is dispatched every five seconds.

  3. To disable triggers on the dispatch, use the ct.stop_triggers function.

[2023-09-25 08:51:25,893] [DEBUG] Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:
[2023-09-25 08:51:25,894] [DEBUG] Line 336 in stop_triggers: 44f56fcf-96dc-4089-84d4-069fd13e3e58

Note that the stop_triggers function disables all triggers attached to the specified dispatch.

See Also#

Adding a Directory Trigger to a Lattice

Adding a SQLite Trigger to a Lattice

Adding a Database Trigger to a Lattice