How to perform File Transfers during workflows#
We can perform file transfer operations pre or post electron execution here we illustrate how to perform file transfer using Rsync locally and remotely via SSH.
Local File Transfers (Rsync)#
We first define a source & destination filepath where we want to transfer a file from the
source_filepath location to the
destination_filepath location as well as create an empty file in
source_filepath to have a file to transfer.
from pathlib import Path # define source & destination filepaths source_filepath = Path('./my_source_file').resolve() destination_filepath = Path('./my_dest_file').resolve() # create an example file source_filepath.touch()
We then run a workflow by defining our
FileTransfer operation which defaults to the local
import covalent as ct
# Dispatch a workflow to transfer from source to destination, and write to destination file @ct.electron( files=[ct.fs.FileTransfer(str(source_filepath), str(destination_filepath))] ) def my_file_transfer_task(files=): from_file, to_file = files with open(to_file,'w') as f: f.write('hello') return to_file @ct.lattice() def my_workflow(): return my_file_transfer_task() dispatch_id = ct.dispatch(my_workflow)() result_filepath = ct.get_result(dispatch_id, wait=True).result # read from destination file which we wrote to with open(result_filepath,'r') as f: print(f.read()) # clean up files source_filepath.unlink() destination_filepath.unlink()
After executing the workflow we now see a copy of the file (
source_filepath) located in
my_dest_file. This file transfer occured prior to electon execution.
Remote File Transfers (Rsync SSH)#
Similarly we can perform file transfers using Rsync via SSH in order to transfer a file located in
source_filepath to a remote host’s filesystem located at
strategy = ct.fs_strategies.Rsync(user='ubuntu', host='22.214.171.124', private_key_path='/path/to/private/key') @ct.electron( files=[ct.fs.TransferToRemote('/home/ubuntu/my_dest_file', str(source_filepath), strategy=strategy)] ) def my_remote_file_transfer_task(files=): pass @ct.lattice() def my_workflow(): return my_remote_file_transfer_task() ct.dispatch(my_workflow)()
After workflow execution the file located at
source_filepath will be transfered to host
126.96.36.199 in the host’s filesystem (
/home/ubuntu/my_dest_file). This file transfer occurs after electron execution.
S3 bucket transfer#
We can perform file transfer between an S3 bucket and local filesystem using the boto3 library. Here we show a simple example where a zip file is downloaded from the S3 bucket before its execution. The electron performs necessary operations on them and the processed files are uploaded back to the S3 bucket.
import covalent as ct import zipfile import os strategy = ct.fs_strategies.S3() ft_2 = ct.fs.FileTransfer('/home/ubuntu/tmp-dir/images.zip','s3://covalent-tmp/images.zip',strategy = strategy,order=ct.fs.Order.AFTER) ft_1 = ct.fs.FileTransfer('s3://covalent-tmp/test_vids.zip','/home/ubuntu/tmp-dir/test_vids.zip',strategy = strategy) @ct.electron(files = [ft_1,ft_2]) def zip_unzip(files=): path = "/home/ubuntu/tmp-dir" #unzip downloaded data with zipfile.ZipFile(path + "/test_vids.zip", 'r') as zip_ref: zip_ref.extractall(path) #Perform necessary operations on the files #Zip files to upload it back with zipfile.ZipFile(path + "/images.zip", 'w', zipfile.ZIP_DEFLATED) as ziph: for root, dirs, files in os.walk(path + '/test_vids'): for file in files: ziph.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.join(path, '..'))) @ct.lattice def run_electrons(): return unzip_zip() dispatch_id = ct.dispatch(run_electrons)()