Canceling a Running Workflow#

You can cancel a dispatched workflow before it completes.

Prerequisites#

Dispatch a workflow that takes a long time to complete. During that time, you decide to cancel the workflow (perhaps because you realize you started it with the wrong parameters).

The following example workflow uses the time.sleep() function to simulate lengthy computations:

[1]:
import covalent as ct
import time

@ct.electron
def task_1(x):
    time.sleep(x)
    print("Task 1")
    return x

@ct.electron
def task_2(a, b):
    c = a + b
    time.sleep(c)
    print("Task 2")
    return c

@ct.electron
def task_3(a):
    time.sleep(2)
    print("Task 3")
    return a ** 2

@ct.lattice
def workflow(x):
    res_2 = task_2(task_1(10), x)
    for _ in range(10):
        task_1(10)
    task_3(res_2)

dispatch_id = ct.dispatch(workflow)(5)
print(dispatch_id)
19971597-97b2-4e67-8224-48c71ebc0b11

Procedure#

  1. Demonstrate that the workflow is still running by executing the following within a few seconds of starting the workflow:

[2]:
time.sleep(3)
result = ct.get_result(dispatch_id)
print(result)

Lattice Result
==============
status: RUNNING
result: None
input args: ['5']
input kwargs: {}
error: None

start_time: 2023-04-24 18:26:34.259397
end_time: None

results_dir: /home/user/.local/share/covalent/data
dispatch_id: 19971597-97b2-4e67-8224-48c71ebc0b11

Node Outputs
------------
task_1(0): None
:parameter:10(1): None
task_2(2): None
:parameter:5(3): None
task_1(4): None
:parameter:10(5): None
task_1(6): None
:parameter:10(7): None
task_1(8): None
:parameter:10(9): None
task_1(10): None
:parameter:10(11): None
task_1(12): None
:parameter:10(13): None
task_1(14): None
:parameter:10(15): None
task_1(16): None
:parameter:10(17): None
task_1(18): None
:parameter:10(19): None
task_1(20): None
:parameter:10(21): None
task_1(22): None
:parameter:10(23): None
task_3(24): None
:postprocess:(25): None

  1. Use the ct.cancel() function to stop the workflow:

[3]:
ct.cancel(dispatch_id)
[3]:
'Dispatch 19971597-97b2-4e67-8224-48c71ebc0b11 cancelled.'
  1. Check the result again. Note that some of the nodes might have completed (they have returned outputs), but the lattice status is CANCELLED.

[4]:
result = ct.get_result(dispatch_id, wait=True)
print(result)

print(result.get_all_node_outputs())

Lattice Result
==============
status: CANCELLED
result: None
input args: ['5']
input kwargs: {}
error: The following tasks failed:


start_time: 2023-04-24 18:26:34.259397
end_time: 2023-04-24 18:26:45.612591

results_dir: /home/user/.local/share/covalent/data
dispatch_id: 19971597-97b2-4e67-8224-48c71ebc0b11

Node Outputs
------------
task_1(0): 10
:parameter:10(1): 10
task_2(2): None
:parameter:5(3): 5
task_1(4): 10
:parameter:10(5): 10
task_1(6): 10
:parameter:10(7): 10
task_1(8): 10
:parameter:10(9): 10
task_1(10): 10
:parameter:10(11): 10
task_1(12): 10
:parameter:10(13): 10
task_1(14): 10
:parameter:10(15): 10
task_1(16): 10
:parameter:10(17): 10
task_1(18): 10
:parameter:10(19): 10
task_1(20): 10
:parameter:10(21): 10
task_1(22): 10
:parameter:10(23): 10
task_3(24): None
:postprocess:(25): None

{'task_1(0)': <covalent.TransportableObject object at 0x7f4979b191f0>, ':parameter:10(1)': <covalent.TransportableObject object at 0x7f4979b190d0>, 'task_2(2)': None, ':parameter:5(3)': <covalent.TransportableObject object at 0x7f4979b197f0>, 'task_1(4)': <covalent.TransportableObject object at 0x7f4979b19970>, ':parameter:10(5)': <covalent.TransportableObject object at 0x7f4979b19d90>, 'task_1(6)': <covalent.TransportableObject object at 0x7f4979b19b20>, ':parameter:10(7)': <covalent.TransportableObject object at 0x7f4979b19f70>, 'task_1(8)': <covalent.TransportableObject object at 0x7f4979ab91c0>, ':parameter:10(9)': <covalent.TransportableObject object at 0x7f4979ab9220>, 'task_1(10)': <covalent.TransportableObject object at 0x7f4979ab9460>, ':parameter:10(11)': <covalent.TransportableObject object at 0x7f4979ab94c0>, 'task_1(12)': <covalent.TransportableObject object at 0x7f4979ab9700>, ':parameter:10(13)': <covalent.TransportableObject object at 0x7f4979ab9760>, 'task_1(14)': <covalent.TransportableObject object at 0x7f4979ab99a0>, ':parameter:10(15)': <covalent.TransportableObject object at 0x7f4979ab9a00>, 'task_1(16)': <covalent.TransportableObject object at 0x7f4979ab9c40>, ':parameter:10(17)': <covalent.TransportableObject object at 0x7f4979ab9ca0>, 'task_1(18)': <covalent.TransportableObject object at 0x7f4979ab9ee0>, ':parameter:10(19)': <covalent.TransportableObject object at 0x7f4979ab9f40>, 'task_1(20)': <covalent.TransportableObject object at 0x7f4979ac21c0>, ':parameter:10(21)': <covalent.TransportableObject object at 0x7f4979ac2220>, 'task_1(22)': <covalent.TransportableObject object at 0x7f4979ac2460>, ':parameter:10(23)': <covalent.TransportableObject object at 0x7f4979ac24c0>, 'task_3(24)': None, ':postprocess:(25)': None}

In this example, which used the default Dask executor, any tasks that were already started ran to completion because Dask does not allow cancellation of a running thread. Only nodes that had not yet begun execution were canceled.

In general, how the lattice shuts down depends on the executors used by the nodes.