How to perform File Transfers during workflows#

We can perform file transfer operations pre or post electron execution here we illustrate how to perform file transfer using Rsync locally and remotely via SSH.

Local File Transfers (Rsync)#

We first define a source & destination filepath where we want to transfer a file from the source_filepath location to the destination_filepath location as well as create an empty file in source_filepath to have a file to transfer.

[ ]:
from pathlib import Path

# define source & destination filepaths
source_filepath = Path('./my_source_file').resolve()
destination_filepath = Path('./my_dest_file').resolve()

# create an example file
source_filepath.touch()

We then run a workflow by defining our FileTransfer operation which defaults to the local Rsync strategy.

[ ]:
import covalent as ct
[ ]:
# Dispatch a workflow to transfer from source to destination, and write to destination file
@ct.electron(
        files=[ct.fs.FileTransfer(str(source_filepath), str(destination_filepath))]
)
def my_file_transfer_task(files=[]):
    from_file, to_file = files[0]
    with open(to_file,'w') as f:
        f.write('hello')
    return to_file

@ct.lattice()
def my_workflow():
    return my_file_transfer_task()

dispatch_id = ct.dispatch(my_workflow)()
result_filepath = ct.get_result(dispatch_id, wait=True).result

# read from destination file which we wrote to
with open(result_filepath,'r') as f:
    print(f.read())

# clean up files
source_filepath.unlink()
destination_filepath.unlink()

After executing the workflow we now see a copy of the file (source_filepath) located in my_dest_file. This file transfer occured prior to electon execution.

Remote File Transfers (Rsync SSH)#

Similarly we can perform file transfers using Rsync via SSH in order to transfer a file located in source_filepath to a remote host’s filesystem located at /home/ubuntu/my_dest_file

strategy = ct.fs_strategies.Rsync(user='ubuntu', host='44.202.86.215', private_key_path='/path/to/private/key')

@ct.electron(
        files=[ct.fs.TransferToRemote('/home/ubuntu/my_dest_file', str(source_filepath), strategy=strategy)]
)
def my_remote_file_transfer_task(files=[]):
    pass

@ct.lattice()
def my_workflow():
    return my_remote_file_transfer_task()

ct.dispatch(my_workflow)()

After workflow execution the file located at source_filepath will be transfered to host 44.202.86.215 in the host’s filesystem (/home/ubuntu/my_dest_file). This file transfer occurs after electron execution.

S3 bucket transfer#

We can perform file transfer between an S3 bucket and local filesystem using the boto3 library. Here we show a simple example where a zip file is downloaded from the S3 bucket before its execution. The electron performs necessary operations on them and the processed files are uploaded back to the S3 bucket.

import covalent as ct
import zipfile
import os

strategy = ct.fs_strategies.S3()

ft_2 = ct.fs.FileTransfer('/home/ubuntu/tmp-dir/images.zip','s3://covalent-tmp/images.zip',strategy = strategy,order=ct.fs.Order.AFTER)
ft_1 = ct.fs.FileTransfer('s3://covalent-tmp/test_vids.zip','/home/ubuntu/tmp-dir/test_vids.zip',strategy = strategy)


@ct.electron(files = [ft_1,ft_2])
def zip_unzip(files=[]):
    path = "/home/ubuntu/tmp-dir"
    #unzip downloaded data
    with zipfile.ZipFile(path + "/test_vids.zip", 'r') as zip_ref:
        zip_ref.extractall(path)

    #Perform necessary operations on the files


    #Zip files to upload it back
    with zipfile.ZipFile(path + "/images.zip",  'w', zipfile.ZIP_DEFLATED) as ziph:
        for root, dirs, files in os.walk(path + '/test_vids'):
            for file in files:
                ziph.write(os.path.join(root, file),
                           os.path.relpath(os.path.join(root, file),
                                           os.path.join(path, '..')))


@ct.lattice
def run_electrons():
    return unzip_zip()

dispatch_id = ct.dispatch(run_electrons)()