Source code for covalent._file_transfer.strategies.rsync_strategy

# Copyright 2021 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from subprocess import PIPE, CalledProcessError, Popen
from typing import Optional

from .. import File
from .transfer_strategy_base import FileTransferStrategy


[docs]class Rsync(FileTransferStrategy): """ Implements Base FileTransferStrategy class to use rsync to move files to and from remote or local filesystems. Rsync via ssh is used if one of the provided files is marked as remote. Attributes: user: (optional) Determine user to specify for remote host if using rsync with ssh host: (optional) Determine what host to connect to if using rsync with ssh private_key_path: (optional) Filepath for ssh private key to use if using rsync with ssh """ def __init__( self, user: Optional[str] = "", host: Optional[str] = "", private_key_path: Optional[str] = None, ): self.user = user self.private_key_path = private_key_path self.host = host if self.private_key_path and not os.path.exists(self.private_key_path): raise FileNotFoundError( f"Provided private key ({self.private_key_path}) does not exist. Could not instantiate Rsync File Transfer Strategy. " ) def get_rsync_ssh_cmd( self, local_file: File, remote_file: File, transfer_from_remote: bool = False ) -> str: local_filepath = local_file.filepath remote_filepath = remote_file.filepath args = ["rsync"] if self.private_key_path: args.append(f'-ae "ssh -i {self.private_key_path}"') else: args.append("-ae ssh") remote_source = f"{self.user}@{self.host}:{remote_filepath}" if transfer_from_remote: args.append(remote_source) args.append(local_filepath) else: args.append(local_filepath) args.append(remote_source) return " ".join(args) def get_rsync_cmd( self, from_file: File, to_file: File, transfer_from_remote: bool = False ) -> str: from_filepath = from_file.filepath to_filepath = to_file.filepath return f"rsync -a {from_filepath} {to_filepath}" def return_subprocess_callable(self, cmd) -> None: def callable(): p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) output, error = p.communicate() if p.returncode != 0: raise CalledProcessError(p.returncode, f'"{cmd}" with error: {str(error)}') return callable # return callable to move files in the local file system def cp(self, from_file: File, to_file: File = File()) -> None: cmd = self.get_rsync_cmd(from_file, to_file) return self.return_subprocess_callable(cmd) # return callable to download here implies 'from' is a remote source def download(self, from_file: File, to_file: File = File()) -> File: cmd = self.get_rsync_ssh_cmd(to_file, from_file, transfer_from_remote=True) return self.return_subprocess_callable(cmd) # return callable to upload here implies 'to' is a remote source def upload(self, from_file: File, to_file: File) -> None: cmd = self.get_rsync_ssh_cmd(from_file, to_file, transfer_from_remote=False) return self.return_subprocess_callable(cmd)