How to add a time trigger to a lattice#

In this guide we’ll illustrate how to use a TimeTrigger to trigger workflow runs automatically every x seconds.

Let’s first import the required parts:

import covalent as ct
from covalent.triggers import TimeTrigger

Now, let’s create our TimeTrigger object which performs a trigger action every 5 seconds.

time_trigger = TimeTrigger(time_gap=5)

Let’s create a simple workflow now:

def my_workflow():
    return 42

Once we’ve made sure that the covalent server is running, we can perform the dispatch for my_workflow as such, but this time we will be disabling the execution of this lattice for the first time using disable_run parameter in ct.dispatch:

dispatch_id = ct.dispatch(my_workflow, disable_run=True)()

Now let’s attach that trigger to this dispatch_id and register it with the triggers server.

time_trigger.lattice_dispatch_id = dispatch_id

And that’s it, as you can see this was another way of attaching triggers to a workflow, which is slightly different than how we did in the DirTrigger How To Guide.

Also, now if you check the UI you’ll see that a new my_workflow gets dispatched every 5 seconds. If we want to stop that after a while, we can use the ct.stop_triggers function:

Triggers for following dispatch_ids have stopped observing:

The above will prevent any new dispatches from happening due to the trigger action on my_workflow lattice.