How to perform File Transfers during workflows#
We can perform file transfer operations pre or post electron execution here we illustrate how to perform file transfer using Rsync locally and remotely via SSH.
Local File Transfers (Rsync)#
We first define a source & destination filepath where we want to transfer a file from the source_filepath
location to the destination_filepath
location as well as create an empty file in source_filepath
to have a file to transfer.
[ ]:
from pathlib import Path
# define source & destination filepaths
source_filepath = Path('./my_source_file').resolve()
destination_filepath = Path('./my_dest_file').resolve()
# create an example file
source_filepath.touch()
We then run a workflow by defining our FileTransfer
operation which defaults to the local Rsync
strategy.
[ ]:
import covalent as ct
[ ]:
# Dispatch a workflow to transfer from source to destination, and write to destination file
@ct.electron(
files=[ct.fs.FileTransfer(str(source_filepath), str(destination_filepath))]
)
def my_file_transfer_task(files=[]):
from_file, to_file = files[0]
with open(to_file,'w') as f:
f.write('hello')
return to_file
@ct.lattice()
def my_workflow():
return my_file_transfer_task()
dispatch_id = ct.dispatch(my_workflow)()
result_filepath = ct.get_result(dispatch_id, wait=True).result
# read from destination file which we wrote to
with open(result_filepath,'r') as f:
print(f.read())
# clean up files
source_filepath.unlink()
destination_filepath.unlink()
After executing the workflow we now see a copy of the file (source_filepath
) located in my_dest_file
. This file transfer occured prior to electon execution.
Remote File Transfers (Rsync SSH)#
Similarly we can perform file transfers using Rsync via SSH in order to transfer a file located in source_filepath
to a remote host’s filesystem located at /home/ubuntu/my_dest_file
strategy = ct.fs_strategies.Rsync(user='ubuntu', host='44.202.86.215', private_key_path='/path/to/private/key')
@ct.electron(
files=[ct.fs.TransferToRemote('/home/ubuntu/my_dest_file', str(source_filepath), strategy=strategy)]
)
def my_remote_file_transfer_task(files=[]):
pass
@ct.lattice()
def my_workflow():
return my_remote_file_transfer_task()
ct.dispatch(my_workflow)()
After workflow execution the file located at source_filepath
will be transfered to host 44.202.86.215
in the host’s filesystem (/home/ubuntu/my_dest_file
). This file transfer occurs after electron execution.
S3 bucket transfer#
We can perform file transfer between an S3 bucket and local filesystem using the boto3 library. Here we show a simple example where a zip file is downloaded from the S3 bucket before its execution. The electron performs necessary operations on them and the processed files are uploaded back to the S3 bucket.
import covalent as ct
import zipfile
import os
strategy = ct.fs_strategies.S3()
ft_2 = ct.fs.FileTransfer('/home/ubuntu/tmp-dir/images.zip','s3://covalent-tmp/images.zip',strategy = strategy,order=ct.fs.Order.AFTER)
ft_1 = ct.fs.FileTransfer('s3://covalent-tmp/test_vids.zip','/home/ubuntu/tmp-dir/test_vids.zip',strategy = strategy)
@ct.electron(files = [ft_1,ft_2])
def zip_unzip(files=[]):
path = "/home/ubuntu/tmp-dir"
#unzip downloaded data
with zipfile.ZipFile(path + "/test_vids.zip", 'r') as zip_ref:
zip_ref.extractall(path)
#Perform necessary operations on the files
#Zip files to upload it back
with zipfile.ZipFile(path + "/images.zip", 'w', zipfile.ZIP_DEFLATED) as ziph:
for root, dirs, files in os.walk(path + '/test_vids'):
for file in files:
ziph.write(os.path.join(root, file),
os.path.relpath(os.path.join(root, file),
os.path.join(path, '..')))
@ct.lattice
def run_electrons():
return unzip_zip()
dispatch_id = ct.dispatch(run_electrons)()