Adding a Database Trigger to a Lattice#

This example illustrates how to use covalent.trigger.DatabaseTrigger to trigger the workflow dispatches automatically after the successful execution of table reads with the conditions for N number of times.

Prerequisites#

  1. Install the recommended SQL drivers that support SQLAlchemy.

  2. Make sure a user with the name postgres exists in your PostgreSQL database (you can run createuser postgres command for that).

  3. Create a database called aqdb (you can run createdb aqdb command for this). (both the commands mentioned above come installed as part of PostgreSQL installation as tested in macOS postgres installation using homebrew)

  4. Import the Covalent and the DatabaseTrigger.

[1]:
import covalent as ct
from covalent.triggers import DatabaseTrigger

Procedure#

  1. Create a new table test_db_trigger.

[2]:

db_path = "postgresql+pg8000://postgres:sam@localhost:5432/aqdb" table_name = 'test_db_trigger' #create table from sqlalchemy import Table, Column, MetaData, DateTime, create_engine meta = MetaData() engine = create_engine(db_path, echo=True) test_db_trigger = Table( table_name, meta, Column('trigger_at', DateTime, primary_key = True), ) meta.create_all(engine)
2024-01-07 17:34:25,040 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2024-01-07 17:34:25,041 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-07 17:34:25,042 INFO sqlalchemy.engine.Engine select current_schema()
2024-01-07 17:34:25,042 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-07 17:34:25,043 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2024-01-07 17:34:25,043 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-07 17:34:25,044 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-01-07 17:34:25,044 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2024-01-07 17:34:25,045 INFO sqlalchemy.engine.Engine [generated in 0.00025s] ('test_db_trigger',)
2024-01-07 17:34:25,046 INFO sqlalchemy.engine.Engine
CREATE TABLE test_db_trigger (
        trigger_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
        PRIMARY KEY (trigger_at)
)


2024-01-07 17:34:25,046 INFO sqlalchemy.engine.Engine [no key 0.00021s] ()
2024-01-07 17:34:25,049 INFO sqlalchemy.engine.Engine COMMIT
  1. Load sample data into the newly created table

[3]:
# load sample data.
from sqlalchemy import insert
from datetime import datetime
import time

with engine.connect() as conn:
    values = []
    for _ in range(10):
        values.append({"trigger_at": datetime.now()})
        time.sleep(0.1)
    result = conn.execute(insert(test_db_trigger),[*values])
2024-01-07 17:34:28,071 INFO sqlalchemy.engine.Engine INSERT INTO test_db_trigger (trigger_at) VALUES (%s)
2024-01-07 17:34:28,071 INFO sqlalchemy.engine.Engine [generated in 0.00094s] ((datetime.datetime(2024, 1, 7, 12, 34, 27, 33553),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 138484),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 243426),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 348190),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 453000),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 555778),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 655854),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 760891),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 862814),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 965930),))
2024-01-07 17:34:28,079 INFO sqlalchemy.engine.Engine COMMIT
/var/folders/1z/64_91wwj46ng1xjffddgz_n40000gn/T/ipykernel_2625/3058673604.py:11: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  result = conn.execute(insert(test_db_trigger),[*values])
  1. Create a Database Trigger object that performs a trigger. We can parse parameters such as db_path, table_name, trigger_after_n, and poll_interval. For this illustration, we will use the PostgreSQL database.

[4]:
database_trigger = DatabaseTrigger(db_path='postgresql+pg8000://postgres:sam@localhost:5432/aqdb',
                                   table_name=table_name,
                                    trigger_after_n=2,
                                  poll_interval=3)
  1. Create a workflow:

[5]:
@ct.lattice
@ct.electron
def my_workflow():
    return 42
  1. Dispatch my_workflow, disabling its first execution using the disable_run parameter in ct.dispatch.

[6]:
dispatch_id = ct.dispatch(my_workflow)()
print(dispatch_id)
a7a74b5e-e865-430e-9a6c-258e8c51d0ed
  1. Attach the trigger to the dispatch_id and register it with the trigger server with the where clause to filter dispatches with lattice name my_workflow.

[7]:
database_trigger.lattice_dispatch_id = dispatch_id
triggered_at = values[-1]["trigger_at"]
database_trigger.where_clauses = [f"trigger_at = '{str(triggered_at)}'"]
database_trigger.register()
  1. Monitor the Covalent UI. Watch the Dashboard for new dispatches of my_workflow.

  2. In the Covalent UI, observe that a new my_workflow is dispatched after reading the table two times and with a polling interval of 3 seconds.

  3. To disable triggers on the dispatch, use the ct.stop_triggers function and drop the test_db_trigger table.

[8]:
ct.stop_triggers(dispatch_id)
meta.drop_all(engine, tables=[test_db_trigger], checkfirst=True)
2024-01-07 17:35:51,633 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-01-07 17:35:51,633 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2024-01-07 17:35:51,634 INFO sqlalchemy.engine.Engine [cached since 86.59s ago] ('test_db_trigger',)
2024-01-07 17:35:51,635 INFO sqlalchemy.engine.Engine
DROP TABLE test_db_trigger
2024-01-07 17:35:51,635 INFO sqlalchemy.engine.Engine [no key 0.00026s] ()
2024-01-07 17:35:51,698 INFO sqlalchemy.engine.Engine COMMIT

Note that the stop_triggers function disables all triggers attached to the specified dispatch.

See Also#

Adding a Directory Trigger to a Lattice

Adding a TimeTrigger to a Lattice

Adding a SQLite Trigger to a Lattice