How to add a time trigger to a lattice#

In this guide we’ll illustrate how to use a TimeTrigger to trigger workflow runs automatically every x seconds.

Let’s first import the required parts:

[1]:
import covalent as ct
from covalent.triggers import TimeTrigger

Now, let’s create our TimeTrigger object which performs a trigger action every 5 seconds.

[2]:
time_trigger = TimeTrigger(time_gap=5)

Let’s create a simple workflow now:

[3]:
@ct.lattice
@ct.electron
def my_workflow():
    return 42

Once we’ve made sure that the covalent server is running, we can perform the dispatch for my_workflow as such, but this time we will be disabling the execution of this lattice for the first time using disable_run parameter in ct.dispatch:

[4]:
dispatch_id = ct.dispatch(my_workflow, disable_run=True)()
print(dispatch_id)
372930b7-f81a-4e91-9302-236de8a0a9c0

Now let’s attach that trigger to this dispatch_id and register it with the triggers server.

[5]:
time_trigger.lattice_dispatch_id = dispatch_id
time_trigger.register()

And that’s it, as you can see this was another way of attaching triggers to a workflow, which is slightly different than how we did in the DirTrigger How To Guide.

Also, now if you check the UI you’ll see that a new my_workflow gets dispatched every 5 seconds. If we want to stop that after a while, we can use the ct.stop_triggers function:

[6]:
ct.stop_triggers(dispatch_id)
Triggers for following dispatch_ids have stopped observing:
372930b7-f81a-4e91-9302-236de8a0a9c0

The above will prevent any new dispatches from happening due to the trigger action on my_workflow lattice.