Adding a Database Trigger to a Lattice#
This example illustrates how to use covalent.trigger.DatabaseTrigger
to trigger the workflow dispatches automatically after the successful execution of table reads with the conditions for N number of times.
Prerequisites#
Install the recommended SQL drivers that support SQLAlchemy.
Make sure a user with the name
postgres
exists in your PostgreSQL database (you can runcreateuser postgres
command for that).Create a database called
aqdb
(you can runcreatedb aqdb
command for this). (both the commands mentioned above come installed as part of PostgreSQL installation as tested in macOSpostgres
installation using homebrew)Import the Covalent and the DatabaseTrigger.
[1]:
import covalent as ct
from covalent.triggers import DatabaseTrigger
Procedure#
Create a new table
test_db_trigger
.
[2]:
db_path = "postgresql+pg8000://postgres:sam@localhost:5432/aqdb"
table_name = 'test_db_trigger'
#create table
from sqlalchemy import Table, Column, MetaData, DateTime, create_engine
meta = MetaData()
engine = create_engine(db_path, echo=True)
test_db_trigger = Table(
table_name, meta,
Column('trigger_at', DateTime, primary_key = True),
)
meta.create_all(engine)
2024-01-07 17:34:25,040 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2024-01-07 17:34:25,041 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-07 17:34:25,042 INFO sqlalchemy.engine.Engine select current_schema()
2024-01-07 17:34:25,042 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-07 17:34:25,043 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2024-01-07 17:34:25,043 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-01-07 17:34:25,044 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-01-07 17:34:25,044 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2024-01-07 17:34:25,045 INFO sqlalchemy.engine.Engine [generated in 0.00025s] ('test_db_trigger',)
2024-01-07 17:34:25,046 INFO sqlalchemy.engine.Engine
CREATE TABLE test_db_trigger (
trigger_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
PRIMARY KEY (trigger_at)
)
2024-01-07 17:34:25,046 INFO sqlalchemy.engine.Engine [no key 0.00021s] ()
2024-01-07 17:34:25,049 INFO sqlalchemy.engine.Engine COMMIT
Load sample data into the newly created table
[3]:
# load sample data.
from sqlalchemy import insert
from datetime import datetime
import time
with engine.connect() as conn:
values = []
for _ in range(10):
values.append({"trigger_at": datetime.now()})
time.sleep(0.1)
result = conn.execute(insert(test_db_trigger),[*values])
2024-01-07 17:34:28,071 INFO sqlalchemy.engine.Engine INSERT INTO test_db_trigger (trigger_at) VALUES (%s)
2024-01-07 17:34:28,071 INFO sqlalchemy.engine.Engine [generated in 0.00094s] ((datetime.datetime(2024, 1, 7, 12, 34, 27, 33553),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 138484),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 243426),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 348190),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 453000),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 555778),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 655854),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 760891),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 862814),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 965930),))
2024-01-07 17:34:28,079 INFO sqlalchemy.engine.Engine COMMIT
/var/folders/1z/64_91wwj46ng1xjffddgz_n40000gn/T/ipykernel_2625/3058673604.py:11: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
result = conn.execute(insert(test_db_trigger),[*values])
Create a
Database Trigger
object that performs a trigger. We can parse parameters such asdb_path
,table_name
,trigger_after_n
, andpoll_interval
. For this illustration, we will use thePostgreSQL
database.
[4]:
database_trigger = DatabaseTrigger(db_path='postgresql+pg8000://postgres:sam@localhost:5432/aqdb',
table_name=table_name,
trigger_after_n=2,
poll_interval=3)
Create a workflow:
[5]:
@ct.lattice
@ct.electron
def my_workflow():
return 42
Dispatch
my_workflow
, disabling its first execution using thedisable_run
parameter inct.dispatch
.
[6]:
dispatch_id = ct.dispatch(my_workflow)()
print(dispatch_id)
a7a74b5e-e865-430e-9a6c-258e8c51d0ed
Attach the trigger to the
dispatch_id
and register it with the trigger server with the where clause to filter dispatches with lattice namemy_workflow
.
[7]:
database_trigger.lattice_dispatch_id = dispatch_id
triggered_at = values[-1]["trigger_at"]
database_trigger.where_clauses = [f"trigger_at = '{str(triggered_at)}'"]
database_trigger.register()
Monitor the Covalent UI. Watch the Dashboard for new dispatches of
my_workflow
.In the Covalent UI, observe that a new
my_workflow
is dispatched after reading the table two times and with a polling interval of 3 seconds.To disable triggers on the dispatch, use the
ct.stop_triggers
function and drop thetest_db_trigger
table.
[8]:
ct.stop_triggers(dispatch_id)
meta.drop_all(engine, tables=[test_db_trigger], checkfirst=True)
2024-01-07 17:35:51,633 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-01-07 17:35:51,633 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2024-01-07 17:35:51,634 INFO sqlalchemy.engine.Engine [cached since 86.59s ago] ('test_db_trigger',)
2024-01-07 17:35:51,635 INFO sqlalchemy.engine.Engine
DROP TABLE test_db_trigger
2024-01-07 17:35:51,635 INFO sqlalchemy.engine.Engine [no key 0.00026s] ()
2024-01-07 17:35:51,698 INFO sqlalchemy.engine.Engine COMMIT
Note that the stop_triggers
function disables all triggers attached to the specified dispatch.
See Also#
Adding a Directory Trigger to a Lattice