Transferring Local Files During Workflows#

Transfer files locally before or after executing an electron.

Prerequisites#

  1. For convenience in running this example, define the read (source) and write (destination) file paths.

[1]:
from pathlib import Path

# Define source and destination filepaths
source_filepath = Path('./my_source_file').resolve()
dest_filepath = Path('./my_dest_file').resolve()
  1. Create a source file to transfer.

[2]:
# Create an example file
file_content = """Mares eat oats and does eat oats
And little lambs eat ivy ...
"""
with open(source_filepath, "w") as f:
    f.write(file_content)

Procedure#

  1. Define a Covalent FileTransfer object, assigning the source and destination file paths respectively as its arguments.

[3]:
import covalent as ct

xfer = ct.fs.FileTransfer(str(source_filepath), str(dest_filepath))
  1. Define a list of Covalent FileTransfer objects to assign to a task. (In this example, the list contains only the single FileTransfer named xfer.)

[4]:
ft_list = [xfer]
  1. Define an electron that uses a Covalent FileTransfer task to read the source file and writes to the destination file, assigning the list containing the FileTransfer objects to the files argument of the electron decorator. (Note that the files argument takes a list of Covalent FileTransfer objects, not files or path names.)

[5]:
@ct.electron(
        files = ft_list
)
def my_file_transfer_task(files):
    from_file, to_file = files[0]
    with open(to_file,'w') as f:
        for line in open(from_file, 'r'):
            f.write(line)
    return to_file

Here is the task definition again, with the three steps combined in the electron decorator. The FileTransfer defaults to the local Rsync strategy:

[6]:
import covalent as ct

@ct.electron(
        files=[ct.fs.FileTransfer(str(source_filepath), str(dest_filepath))] # defaults to Rsync
)
def my_file_transfer_task(files=None):
    from_file, to_file = files[0]
    with open(to_file,'w') as f_to, open(from_file, 'r') as f_from:
        for line in f_from:
            f_to.write(line)
    return to_file
  1. Run the electron thus created in a lattice:

[7]:
# Create and dispatch a workflow to transfer data from source to destination, and write to destination file

@ct.lattice()
def my_workflow():
    return my_file_transfer_task()

dispatch_id = ct.dispatch(my_workflow)()
  1. Confirm the transfer by reading the contents of the destination file: After executing the workflow a copy of the file (source_filepath) has been written to my_dest_file. This file transfer occurred before electron execution.

[8]:
result = ct.get_result(dispatch_id, wait=True)
print(result)
result_filepath = result.result

# Read from the destination file
print("Reading from ", result_filepath, "\n")
with open(result_filepath,'r') as f:
    print(f.read())

# Clean up files
source_filepath.unlink()
dest_filepath.unlink()

Lattice Result
==============
status: COMPLETED
result: /Users/sankalpsanand/dev/covalent/doc/source/how_to/coding/my_dest_file
input args:
input kwargs:
error: None

start_time: 2024-01-05 14:48:31.791928
end_time: 2024-01-05 14:48:32.501517

results_dir: /Users/sankalpsanand/.cache/covalent/results/bbd6f2b0-ecb4-4e05-ba63-6c10da81ceeb
dispatch_id: bbd6f2b0-ecb4-4e05-ba63-6c10da81ceeb

Node Outputs
------------
my_file_transfer_task(0): /Users/sankalpsanand/dev/covalent/doc/source/how_to/coding/my_dest_file
:postprocess:reconstruct(1): /Users/sankalpsanand/dev/covalent/doc/source/how_to/coding/my_dest_file

Reading from  /Users/sankalpsanand/dev/covalent/doc/source/how_to/coding/my_dest_file

Mares eat oats and does eat oats
And little lambs eat ivy ...

See Also#

Transferring Remote Files During Workflows

Transferring Files to and from an S3 Bucket

Transferring Files To and From Azure Blob Storage

Transferring Files To and From Google Cloud Storage