Automate Repetitive Tasks with Triggers#
Triggers are a powerful feature in Covalent that allow you to automate repetitive tasks and streamline your workflow. With these, you can define a pre-defined set of steps that will run automatically every time a specific event occurs.
To use Triggers, you simply need to attach a Trigger object to a lattice. Then, every time the event described in the trigger occurs, the connected lattice will perform a trigger action and dispatch the connected workflow. This makes it easy to automate processes, reducing the risk of human error and ensuring that your pipeline runs smoothly and efficiently.
For example, if you want to plot a graph of a CSV file every time it gets modified, you can use these Triggers to automate this process. The trigger will be watching the CSV file for changes, and every time the file is modified, it will run the workflow to plot a graph of the data.
Triggers are especially useful if you’re using Covalent as part of a larger pipeline, rather than as a user-facing tool. By automating these tasks, you can save time, reduce the risk of error, and ensure that your pipeline runs smoothly and efficiently.
Covalent provides several options for starting the server in relation to triggers. By default, the Covalent server starts with the triggers server endpoints included.
It is also possible to start the Covalent server without the triggers endpoints and manage the
observe() method of the triggers manually, or start the standalone triggers server without Covalent.
The following code block showcases the three different start options:
# Starting the default way which starts with the triggers server endpoints as part of Covalent server covalent start # Starting the Covalent server without the trigger endpoints, thus in order to use triggers you will have # either have to start the triggers server independently or manage the observe() method of triggers manually covalent start --no-triggers # Starting the standalone triggers server without Covalent, this is useful if your Covalent server # is running on a different machine than the triggers server covalent start --triggers-only
For the purpose of this example, let’s assume you started Covalent the default way.
You can attach a Trigger object to a lattice quite simply as shown below:
... tr_object = TimeTrigger(5) @ct.lattice(triggers=tr_object): def my_workflow(): ...
Under the hood, once this is done and when you dispatch the lattice using
ct.dispatch, the following events occur:
The first run of the lattice is disabled, and Covalent only saves the lattice and generates a
dispatch_idfor reference later.
The Trigger object is registered on the triggers server, which is the same as the Covalent server by default.
Upon registration, the
observe()method of the trigger is called, which starts observing for the desired condition to be met in an unblocking manner. In the example above, the TimeTrigger with a time gap of 5 seconds will call the
trigger()method every 5 seconds.
At this point,
ct.dispatchnow returns with the earlier generate
trigger()method, whenever it’s called, performs an automatic dispatch of the connected lattice using the
dispatch_idobtained earlier, and stores the newly obtained
dispatch_ids for connections between the “parent” and subsequent “child”
Once a trigger is started, to stop the automatic dispatching when an event happens, you can call ct.stop_triggers(dispatch_id) with the parent dispatch id
Attaching a Trigger to a Dispatched Workflow#
Another case which might be useful here is let’s say you want to attach a trigger to a workflow which has already been dispatched, and you only have access to its dispatch_id, then in that case you can do the following:
tr_object = TimeTrigger(10) tr_object.lattice_dispatch_id = dispatch_id tr_object.register()
This way of attaching a trigger is equivalent to the one mentioned before, but gives more degrees of freedom. For example, you can register the same trigger to multiple workflows by just repeating the last two lines for each of them. This method also eliminates the need to design workflows with the trigger in mind, disentangling the trigger creation code from the actual workflow code. And in fact, since a trigger can be set post the workflow creation, this method can be used to attach a trigger from an entirely different Python process than the one where the workflow was created
In case you already know that you’re gonna be attaching a trigger to a workflow post-dispatch and don’t wish to run it the first time or until a trigger event takes place, then while dispatching it you can do
ct.dispatch(my_workflow, disable_run=True)() and it won’t start running but will still generate a
dispatch_id which you can later use.
Attaching Triggers to Workflow on Remote Servers#
Another way to attach triggers to workflows that have already been dispatched is by utilizing the
dispatch_id and the address of both the Covalent server and the triggers server. This is useful in scenarios where the trigger should be managed from a separate machine.
For example, let’s consider a scenario where there are 3 machines: 2 remote servers and 1 client machine.
ServerA is the one where Covalent is running without triggers support,
ServerB where only the triggers server is running, and
Client is the one where you are working from.
Let’s say our workflow
my_workflow has been dispatched to
ServerA without any triggers. To attach triggers to that workflow and register it with the triggers server, you can follow the steps given below:
When using triggers remotely, make sure to do
trigger.use_internal_funcs = False this will ensure that the trigger interacts with the Covalent server through the API endpoints instead of directly accessing the required internal functions.
trigger = TimeTrigger(30) # Interacting with dispatcher server through API endpoints trigger.use_internal_funcs = False # Attaching dispatch id of `my_workflow` to the trigger trigger.lattice_dispatch_id = dispatch_id # Specifying the address of the dispatcher server trigger.dispatcher_addr = "<ServerA_addr>" # Specifying the address of the triggers server trigger.triggers_server_addr = "<ServerB_addr>" # Registering it to the triggers server trigger.register()
And this will be sufficient for your workflow to get dispatched every 30 seconds due to this trigger.
Adding a Trigger without Registering it to the Triggers Server#
You can also run the observation component of a trigger as part of your own server, without registering it with the triggers server. For example, if you have a long-running process on a server, you can call the
trigger.observe() function to start observing, as follows:
trigger = TimeTrigger(2) trigger.use_internal_funcs = False trigger.lattice_dispatch_id = dispatch_id trigger.dispatcher_addr = `<ServerA_addr>` # And now start observing trigger.observe()
Keep in mind that it’s important to handle the blocking/non-blocking nature of the
trigger.observe() function correctly. If it’s a blocking call, it’s recommended to offload
trigger.observe() to a separate thread so it doesn’t block the execution of other components of your server. You can check if
trigger.observe() is blocking by accessing the
trigger.observe_blocks attribute of any trigger.
This becomes extremely useful when writing custom triggers, for example to trigger workflows off of email/slack messages. The ability to run
trigger.observe() as part of your own server or process opens up a world of possibilities to integrate triggers into your workflow in a way that best suits your use case.
Types of Triggers in Covalent#
Covalent offers an array of triggers designed to cater to diverse use cases, simplifying the automation of tasks based on a range of conditions. It’s important to note that this list represents the currently available triggers, with more to be added in the future. If you find these triggers valuable and have suggestions for new ones, we encourage you to contribute to Covalent’s GitHub repository.
Here are the currently available triggers in Covalent:
DirTrigger: This trigger observes a specified directory or file for events such as creation, deletion, modification, or movement. It performs the trigger action when these events occur. For example:
from covalent.triggers import DirTrigger import covalent as ct dir_trigger = DirTrigger(dir_path='path/to/your/directory', event_names=['modified']) @ct.lattice(triggers=dir_trigger) def my_workflow(): ...
TimeTrigger: This trigger performs the trigger action after a specified time interval. It is useful for recurring tasks or periodic data processing. For example:
from covalent.triggers import TimeTrigger import covalent as ct time_trigger = TimeTrigger(time_gap=5) # Trigger action every 5 seconds @ct.lattice(triggers=time_trigger) def my_workflow(): ...
SQLiteTrigger: This trigger monitors an SQLite database for changes and performs the trigger action when changes occur. It is helpful for automating tasks in response to database updates. For example:
from covalent.triggers import SQLiteTrigger import covalent as ct sqlite_trigger = SQLiteTrigger(db_path='path/to/your/database.sqlite',table_name='your_table) @ct.lattice(triggers=sqlite_trigger) def my_workflow(): ...
These triggers can be easily integrated into your Covalent workflows to automate various tasks based on the desired conditions.
Trigger How-to Guides#
For further examples on how to use triggers, check out the Trigger how to guides: