diff --git a/pipe-cli/build_linux.sh b/pipe-cli/build_linux.sh index c46ffcae04..e8032ef368 100644 --- a/pipe-cli/build_linux.sh +++ b/pipe-cli/build_linux.sh @@ -51,6 +51,7 @@ python2 $PYINSTALLER_PATH/pyinstaller/pyinstaller.py \ --hidden-import=functools \ --hidden-import=re \ --hidden-import=subprocess \ + --additional-hooks-dir="$PIPE_CLI_SOURCES_DIR/hooks" \ -y \ --clean \ --runtime-tmpdir $PIPE_CLI_RUNTIME_TMP_DIR \ diff --git a/pipe-cli/build_windows.sh b/pipe-cli/build_windows.sh index 09ddca5254..e5eb51f552 100644 --- a/pipe-cli/build_windows.sh +++ b/pipe-cli/build_windows.sh @@ -45,6 +45,7 @@ pyinstaller --add-data "/src/res/effective_tld_names.dat.txt;tld/res/" \ --hidden-import=functools \ --hidden-import=re \ --hidden-import=subprocess \ + --additional-hooks-dir="$PIPE_CLI_SOURCES_DIR/hooks" \ -y \ --clean \ --workpath /tmp \ diff --git a/pipe-cli/hooks/hook-google.resumable_media.requests.py b/pipe-cli/hooks/hook-google.resumable_media.requests.py new file mode 100644 index 0000000000..944fe64e7d --- /dev/null +++ b/pipe-cli/hooks/hook-google.resumable_media.requests.py @@ -0,0 +1,2 @@ +from PyInstaller.utils.hooks import copy_metadata +datas = copy_metadata('requests') diff --git a/pipe-cli/pipe.py b/pipe-cli/pipe.py index 043596ca1f..8f590e3168 100644 --- a/pipe-cli/pipe.py +++ b/pipe-cli/pipe.py @@ -665,9 +665,9 @@ def storage(): @click.option('-c', '--on_cloud', prompt='Do you want to create this storage on a cloud?', help='Create bucket on a cloud', default=False, is_flag=True) -@click.option('-p', '--path', required=False, default='', help='The name of the new bucket.', +@click.option('-p', '--path', default='', help='The name of the new bucket.', prompt='The name of the new bucket.') -@click.option('-r', '--region_id', required=False, type=int, help='Cloud region id where storage shall be created.', +@click.option('-r', '--region_id', default='default', help='Cloud region id where storage shall be created. ', prompt='Cloud region id where storage shall be created.') def create(name, description, short_term_storage, long_term_storage, versioning, backup_duration, type, parent_folder, on_cloud, path, region_id): diff --git a/pipe-cli/requirements.txt b/pipe-cli/requirements.txt index bbf532506b..0424dbf8d6 100644 --- a/pipe-cli/requirements.txt +++ b/pipe-cli/requirements.txt @@ -16,4 +16,5 @@ PyJWT==1.6.1 pypac==0.8.1 beautifulsoup4==4.6.1 azure-storage-blob==1.5.0 +google-cloud-storage==1.14.0 setuptools diff --git a/pipe-cli/setup.py b/pipe-cli/setup.py index 15a44f76cf..3a59834a5e 100644 --- a/pipe-cli/setup.py +++ b/pipe-cli/setup.py @@ -38,7 +38,8 @@ 'PyJWT==1.6.1', 'pypac==0.8.1', 'beautifulsoup4==4.6.1', - 'azure-storage-blob==1.5.0' + 'azure-storage-blob==1.5.0', + 'google-cloud-storage==1.14.0' ], entry_points=''' [console_scripts] diff --git a/pipe-cli/src/api/data_storage.py b/pipe-cli/src/api/data_storage.py index 14d785814e..f84c65448d 100644 --- a/pipe-cli/src/api/data_storage.py +++ b/pipe-cli/src/api/data_storage.py @@ -17,8 +17,6 @@ install_aliases() from urllib.parse import urlparse, urlencode -from urllib.request import urlopen, Request -from urllib.error import HTTPError import json from src.model.data_storage_tmp_credentials_model import TemporaryCredentialsModel diff --git a/pipe-cli/src/model/data_storage_item_model.py b/pipe-cli/src/model/data_storage_item_model.py index 4bbc38d2d9..66c17a767e 100644 --- a/pipe-cli/src/model/data_storage_item_model.py +++ b/pipe-cli/src/model/data_storage_item_model.py @@ -27,6 +27,7 @@ def __init__(self): self.versions = [] self.latest = False self.delete_marker = False + self.deleted = None @classmethod def load(cls, json): diff --git a/pipe-cli/src/model/data_storage_tmp_credentials_model.py b/pipe-cli/src/model/data_storage_tmp_credentials_model.py index a697242e3c..0c097318a3 100644 --- a/pipe-cli/src/model/data_storage_tmp_credentials_model.py +++ b/pipe-cli/src/model/data_storage_tmp_credentials_model.py @@ -28,5 +28,5 @@ def load(cls, json): instance.secret_key = json['accessKey'] instance.session_token = json['token'] instance.expiration = json['expiration'] - instance.region = json['region'] + instance.region = json['region'] if 'region' in json else None return instance diff --git a/pipe-cli/src/model/data_storage_wrapper.py b/pipe-cli/src/model/data_storage_wrapper.py index 65f8852a50..2c840708a6 100644 --- a/pipe-cli/src/model/data_storage_wrapper.py +++ b/pipe-cli/src/model/data_storage_wrapper.py @@ -30,6 +30,7 @@ from ..utilities.storage.s3 import S3BucketOperations from ..utilities.storage.local import LocalOperations from ..utilities.storage.azure import AzureListingManager, AzureDeleteManager, AzureBucketOperations +from ..utilities.storage.gs import GsRestoreManager, GsListingManager, GsDeleteManager, GsBucketOperations from ..utilities.storage.common import StorageOperations from .data_storage_wrapper_type import WrapperType import shutil @@ -55,6 +56,12 @@ class DataStorageWrapper(object): (WrapperType.FTP, WrapperType.AZURE): AzureBucketOperations.get_transfer_from_http_or_ftp_manager, (WrapperType.HTTP, WrapperType.AZURE): AzureBucketOperations.get_transfer_from_http_or_ftp_manager, + (WrapperType.GS, WrapperType.GS): GsBucketOperations.get_transfer_between_buckets_manager, + (WrapperType.GS, WrapperType.LOCAL): GsBucketOperations.get_download_manager, + (WrapperType.LOCAL, WrapperType.GS): GsBucketOperations.get_upload_manager, + (WrapperType.FTP, WrapperType.GS): GsBucketOperations.get_transfer_from_http_or_ftp_manager, + (WrapperType.HTTP, WrapperType.GS): GsBucketOperations.get_transfer_from_http_or_ftp_manager, + (WrapperType.FTP, WrapperType.LOCAL): LocalOperations.get_transfer_from_http_or_ftp_manager, (WrapperType.HTTP, WrapperType.LOCAL): LocalOperations.get_transfer_from_http_or_ftp_manager } @@ -90,7 +97,8 @@ def get_cloud_wrapper_for_bucket(cls, bucket_model, relative_path): def __get_storage_wrapper(cls, bucket, relative_path, *args, **kwargs): _suppliers = { WrapperType.S3: S3BucketWrapper.build_wrapper, - WrapperType.AZURE: AzureBucketWrapper.build_wrapper + WrapperType.AZURE: AzureBucketWrapper.build_wrapper, + WrapperType.GS: GsBucketWrapper.build_wrapper, } if bucket.type in _suppliers: supplier = _suppliers[bucket.type] @@ -214,12 +222,31 @@ def is_file(self): def exists(self): return self.exists_flag + def get_items(self): + return self.get_list_manager().get_items(self.path) + + def is_empty(self, relative=None): + if not self.exists(): + return True + if self.is_file(): + return False + if relative: + delimiter = StorageOperations.PATH_SEPARATOR + path = self.path.rstrip(delimiter) + delimiter + relative + else: + path = self.path + return not self.get_list_manager().folder_exists(path) + + @abstractmethod + def get_type(self): + pass + @abstractmethod def get_restore_manager(self): pass @abstractmethod - def get_list_manager(self, show_versions): + def get_list_manager(self, show_versions=False): pass @abstractmethod @@ -253,9 +280,6 @@ def is_empty(self, relative=None): return not S3BucketOperations.path_exists(self, relative, session=self.session) return self.is_empty_flag - def get_items(self): - return S3BucketOperations.get_items(self, session=self.session) - def get_file_download_uri(self, relative_path): download_url_model = None try: @@ -278,7 +302,7 @@ def delete_item(self, relative_path): def get_restore_manager(self): return S3BucketOperations.get_restore_manager(self) - def get_list_manager(self, show_versions): + def get_list_manager(self, show_versions=False): return S3BucketOperations.get_list_manager(self, show_versions=show_versions) def get_delete_manager(self, versioning): @@ -297,27 +321,12 @@ def build_wrapper(cls, root_bucket, relative_path, versioning=False, init=True): raise RuntimeError('Versioning is not supported by AZURE cloud provider') wrapper = AzureBucketWrapper(root_bucket, relative_path) if init: - AzureBucketOperations.init_wrapper(wrapper) + StorageOperations.init_wrapper(wrapper, versioning=versioning) return wrapper def get_type(self): return WrapperType.AZURE - def is_empty(self, relative=None): - if not self.exists(): - return True - if self.is_file(): - return False - if relative: - delimiter = StorageOperations.PATH_SEPARATOR - path = self.path.rstrip(delimiter) + delimiter + relative - else: - path = self.path - return not self.get_list_manager().folder_exists(path) - - def get_items(self): - return self.get_list_manager().get_items(self.path) - def get_restore_manager(self): raise RuntimeError('Versioning is not supported by AZURE cloud provider') @@ -337,6 +346,31 @@ def _blob_service(self, read, write): return self.service +class GsBucketWrapper(CloudDataStorageWrapper): + + @classmethod + def build_wrapper(cls, root_bucket, relative_path, init=True, *args, **kwargs): + wrapper = GsBucketWrapper(root_bucket, relative_path) + if init: + StorageOperations.init_wrapper(wrapper, *args, **kwargs) + return wrapper + + def get_type(self): + return WrapperType.GS + + def get_restore_manager(self): + return GsRestoreManager(self._storage_client(write=True), self) + + def get_list_manager(self, show_versions=False): + return GsListingManager(self._storage_client(), self.bucket, show_versions) + + def get_delete_manager(self, versioning): + return GsDeleteManager(self._storage_client(write=True), self.bucket) + + def _storage_client(self, read=True, write=False): + return GsBucketOperations.get_client(self.bucket, read=read, write=write) + + class LocalFileSystemWrapper(DataStorageWrapper): def __init__(self, path): diff --git a/pipe-cli/src/model/data_storage_wrapper_type.py b/pipe-cli/src/model/data_storage_wrapper_type.py index b6a1730ca2..c5f97e9af5 100644 --- a/pipe-cli/src/model/data_storage_wrapper_type.py +++ b/pipe-cli/src/model/data_storage_wrapper_type.py @@ -2,15 +2,17 @@ class WrapperType(object): LOCAL = 'LOCAL' S3 = 'S3' AZURE = 'AZ' + GS = 'GS' FTP = 'FTP' HTTP = 'HTTP' - __cloud_types = [S3, AZURE] + __cloud_types = [S3, AZURE, GS] __dynamic_cloud_scheme = 'cp' __s3_cloud_scheme = 's3' __azure_cloud_scheme = 'az' - __cloud_schemes = [__dynamic_cloud_scheme, __s3_cloud_scheme, __azure_cloud_scheme] - __cloud_schemes_map = {S3: __s3_cloud_scheme, AZURE: __azure_cloud_scheme} + __gs_cloud_scheme = 'gs' + __cloud_schemes = [__dynamic_cloud_scheme, __s3_cloud_scheme, __azure_cloud_scheme, __gs_cloud_scheme] + __cloud_schemes_map = {S3: __s3_cloud_scheme, AZURE: __azure_cloud_scheme, GS: __gs_cloud_scheme} @classmethod def cloud_types(cls): diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index fe6b3350c9..3c38e702ca 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -117,7 +117,7 @@ def storage_remove_item(cls, path, yes, version, hard_delete, recursive, exclude if version and hard_delete: click.echo('"version" argument should\'t be combined with "hard-delete" option', err=True) sys.exit(1) - source_wrapper = DataStorageWrapper.get_cloud_wrapper(path, versioning=version or hard_delete) + source_wrapper = DataStorageWrapper.get_cloud_wrapper(path, versioning=version is not None or hard_delete) if source_wrapper is None or not source_wrapper.exists(): click.echo('Storage path "{}" was not found'.format(path), err=True) sys.exit(1) @@ -154,6 +154,14 @@ def save_data_storage(cls, name, description, sts_duration, lts_duration, versio click.echo("Error: Directory with name '{}' not found! " "Check if it exists and you have permission to read it".format(parent_folder), err=True) sys.exit(1) + if region_id == 'default': + region_id = None + else: + try: + region_id = int(region_id) + except ValueError: + click.echo("Error: Given region id '{}' is not a number.".format(region_id)) + sys.exit(1) try: DataStorage.save(name, path, description, sts_duration, lts_duration, versioning, backup_duration, type, directory.id if directory else None, on_cloud, region_id) @@ -384,23 +392,28 @@ def __print_data_storage_contents(cls, bucket_model, relative_path, labels = '' if item.type is not None and item.type in WrapperType.cloud_types(): name = item.path - if item.changed is not None: + item_updated = item.deleted or item.changed + if item_updated is not None: if bucket_model is None: # need to wrap into datetime since bucket listing returns str - item_datetime = datetime.datetime.strptime(item.changed, '%Y-%m-%d %H:%M:%S') + item_datetime = datetime.datetime.strptime(item_updated, '%Y-%m-%d %H:%M:%S') else: - item_datetime = item.changed + item_datetime = item_updated changed = item_datetime.strftime('%Y-%m-%d %H:%M:%S') - if item.size is not None: + if item.size is not None and not item.deleted: size = item.size - if item.labels is not None and len(item.labels) > 0: + if item.labels is not None and len(item.labels) > 0 and not item.deleted: labels = ', '.join(map(lambda i: i.value, item.labels)) - item_type = "-File" if item.delete_marker else item.type + item_type = "-File" if item.delete_marker or item.deleted else item.type row = [item_type, labels, changed, size, name] if show_versions: row.append('') items_table.add_row(row) if show_versions and item.type == 'File': + if item.deleted: + # Additional synthetic delete version + row = ['-File', '', item.deleted.strftime('%Y-%m-%d %H:%M:%S'), size, name, '- (latest)'] + items_table.add_row(row) for version in item.versions: version_type = "-File" if version.delete_marker else "+File" version_label = "{} (latest)".format(version.version) if version.latest else version.version diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 3310d688bc..d254940d40 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -1,20 +1,18 @@ from __future__ import absolute_import import copy +import io import time from threading import Lock +from datetime import timedelta, datetime +import os +import click try: from urllib.request import urlopen # Python 3 except ImportError: from urllib2 import urlopen # Python 2 -from datetime import timedelta, datetime - -import io -import os -import click - from azure.storage.blob import BlockBlobService, ContainerPermissions, Blob from azure.storage.common._auth import _StorageSASAuthentication @@ -93,41 +91,9 @@ def _to_local_item(self, absolute_item, prefix): relative_item.path = relative_item.name return relative_item - def folder_exists(self, relative_path): - prefix = StorageOperations.get_prefix(relative_path).rstrip(self.delimiter) + self.delimiter - for item in self.list_items(prefix, show_all=True): - if prefix.endswith(item.name): - return True - return False - - def get_file_size(self, relative_path): - items = self.list_items(relative_path, show_all=True, recursive=True) - for item in items: - if item.name == relative_path: - return item.size - return None - def get_file_tags(self, relative_path): return dict(self.service.get_blob_metadata(self.bucket.path, relative_path)) - def get_items(self, relative_path): - """ - Returns all files under the given relative path in forms of tuples with the following structure: - ('File', full_path, relative_path, size) - - :param relative_path: Path to a folder or a file. - :return: Generator of file tuples. - """ - prefix = StorageOperations.get_prefix(relative_path).rstrip(self.delimiter) - for item in self.list_items(prefix, recursive=True, show_all=True): - if not StorageOperations.is_relative_path(item.name, prefix): - continue - if item.name == relative_path: - item_relative_path = os.path.basename(item.name) - else: - item_relative_path = StorageOperations.get_item_name(item.name, prefix + self.delimiter) - yield ('File', item.name, item_relative_path, item.size) - class AzureDeleteManager(AzureManager, AbstractDeleteManager): @@ -291,10 +257,10 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path source_wrapper.delete_item(source_key) -class UrlIO(io.BytesIO): +class _SourceUrlIO(io.BytesIO): def __init__(self, url): - super(UrlIO, self).__init__() + super(_SourceUrlIO, self).__init__() self.io = urlopen(url) def read(self, n=10): @@ -325,7 +291,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path return destination_tags = StorageOperations.generate_tags(tags, source_key) progress_callback = AzureProgressPercentage.callback(relative_path, size, quiet) - self.service.create_blob_from_stream(destination_wrapper.bucket.path, destination_key, UrlIO(source_key), + self.service.create_blob_from_stream(destination_wrapper.bucket.path, destination_key, _SourceUrlIO(source_key), metadata=destination_tags, progress_callback=progress_callback) @@ -403,35 +369,10 @@ def _apply_host(self, request, operation_context, retry_context): super(ProxyBlockBlobService, self)._apply_host(request, operation_context, retry_context) request_url = self.protocol + '://' + request.host - self._httpclient.proxies = AzureBucketOperations.get_proxy_config(request_url) + self._httpclient.proxies = StorageOperations.get_proxy_config(request_url) class AzureBucketOperations: - __config__ = None - - @classmethod - def get_proxy_config(cls, target_url=None): - if cls.__config__ is None: - cls.__config__ = Config.instance() - if cls.__config__.proxy is None: - return None - else: - return cls.__config__.resolve_proxy(target_url=target_url) - - @classmethod - def init_wrapper(cls, wrapper): - delimiter = StorageOperations.PATH_SEPARATOR - prefix = StorageOperations.get_prefix(wrapper.path) - check_file = True - if prefix.endswith(delimiter): - prefix = prefix[:-1] - check_file = False - for item in wrapper.get_list_manager().list_items(prefix, show_all=True): - if prefix.endswith(item.name.rstrip(delimiter)) and (check_file or item.type == 'Folder'): - wrapper.exists_flag = True - wrapper.is_file_flag = item.type == 'File' - break - return wrapper @classmethod def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, command): diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index 3dddd5c89d..cb13a19ae5 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -1,5 +1,6 @@ import os import re + from abc import abstractmethod, ABCMeta import click @@ -20,6 +21,32 @@ class StorageOperations: CP_SOURCE_TAG = 'CP_SOURCE' CP_OWNER_TAG = 'CP_OWNER' STORAGE_PATH = '%s://%s/%s' + __config__ = None + + @classmethod + def get_proxy_config(cls, target_url=None): + if cls.__config__ is None: + cls.__config__ = Config.instance() + if cls.__config__.proxy is None: + return None + else: + return cls.__config__.resolve_proxy(target_url=target_url) + + @classmethod + def init_wrapper(cls, wrapper, versioning=False): + delimiter = StorageOperations.PATH_SEPARATOR + prefix = StorageOperations.get_prefix(wrapper.path) + check_file = True + if prefix.endswith(delimiter): + prefix = prefix[:-1] + check_file = False + listing_manager = wrapper.get_list_manager(show_versions=versioning) + for item in listing_manager.list_items(prefix, show_all=True): + if prefix.endswith(item.name.rstrip(delimiter)) and (check_file or item.type == 'Folder'): + wrapper.exists_flag = True + wrapper.is_file_flag = item.type == 'File' + break + return wrapper @classmethod def get_prefix(cls, path, delimiter=PATH_SEPARATOR): @@ -152,6 +179,18 @@ def source_tags(cls, tags, source_path, storage_wrapper): default_tags[StorageOperations.CP_OWNER_TAG] = StorageOperations.get_user() return default_tags + @classmethod + def get_items(cls, listing_manager, relative_path, delimiter=PATH_SEPARATOR): + prefix = StorageOperations.get_prefix(relative_path).rstrip(delimiter) + for item in listing_manager.list_items(prefix, recursive=True, show_all=True): + if not StorageOperations.is_relative_path(item.name, prefix): + continue + if item.name == relative_path: + item_relative_path = os.path.basename(item.name) + else: + item_relative_path = StorageOperations.get_item_name(item.name, prefix + delimiter) + yield ('File', item.name, item_relative_path, item.size) + class AbstractTransferManager: __metaclass__ = ABCMeta @@ -194,6 +233,42 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera """ pass + def get_items(self, relative_path): + """ + Returns all files under the given relative path in forms of tuples with the following structure: + ('File', full_path, relative_path, size) + + :param relative_path: Path to a folder or a file. + :return: Generator of file tuples. + """ + prefix = StorageOperations.get_prefix(relative_path).rstrip(StorageOperations.PATH_SEPARATOR) + for item in self.list_items(prefix, recursive=True, show_all=True): + if not StorageOperations.is_relative_path(item.name, prefix): + continue + if item.name == relative_path: + item_relative_path = os.path.basename(item.name) + else: + item_relative_path = StorageOperations.get_item_name(item.name, prefix + StorageOperations.PATH_SEPARATOR) + yield ('File', item.name, item_relative_path, item.size) + + def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR): + prefix = StorageOperations.get_prefix(relative_path).rstrip(delimiter) + delimiter + for item in self.list_items(prefix, show_all=True): + if prefix.endswith(item.name): + return True + return False + + @abstractmethod + def get_file_tags(self, relative_path): + pass + + def get_file_size(self, relative_path): + items = self.list_items(relative_path, show_all=True, recursive=True) + for item in items: + if item.name == relative_path: + return item.size + return None + class AbstractDeleteManager: __metaclass__ = ABCMeta diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py new file mode 100644 index 0000000000..c9c46f0d52 --- /dev/null +++ b/pipe-cli/src/utilities/storage/gs.py @@ -0,0 +1,498 @@ +import copy +import os +from datetime import datetime, timedelta + +try: + from urllib.parse import urlparse # Python 3 + from urllib.request import urlopen # Python 3 +except ImportError: + from urlparse import urlparse # Python 2 + from urllib2 import urlopen # Python 2 + +import click +from google.auth import _helpers +from google.auth.transport.requests import AuthorizedSession +from google.cloud.storage import Client, Blob +from google.oauth2.credentials import Credentials + +from src.api.data_storage import DataStorage +from src.config import Config +from src.model.data_storage_item_model import DataStorageItemModel, DataStorageItemLabelModel +from src.model.data_storage_tmp_credentials_model import TemporaryCredentialsModel +from src.utilities.patterns import PatternMatcher +from src.utilities.progress_bar import ProgressPercentage +from src.utilities.storage.common import AbstractRestoreManager, AbstractListingManager, StorageOperations, \ + AbstractDeleteManager, AbstractTransferManager + + +class GsProgressPercentage(ProgressPercentage): + + def __init__(self, filename, size): + super(GsProgressPercentage, self).__init__(filename, size) + self._total_bytes = 0 + + def __call__(self, bytes_amount): + newest_bytes = bytes_amount - self._total_bytes + self._total_bytes = bytes_amount + super(GsProgressPercentage, self).__call__(newest_bytes) + + @staticmethod + def callback(source_key, size, quiet): + if not StorageOperations.show_progress(quiet, size): + return None + progress = GsProgressPercentage(source_key, size) + return lambda current: progress(current) + + +class GsManager: + + def __init__(self, client): + self.client = client + + +class GsListingManager(GsManager, AbstractListingManager): + + def __init__(self, client, bucket, show_versions=False): + super(GsListingManager, self).__init__(client) + self.bucket = bucket + self.show_versions = show_versions + + def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, + show_all=False): + prefix = StorageOperations.get_prefix(relative_path) + bucket = self.client.bucket(self.bucket.path) + blobs_iterator = bucket.list_blobs(prefix=prefix if relative_path else None, + max_results=page_size if not show_all and not self.show_versions else None, + delimiter=StorageOperations.PATH_SEPARATOR if not recursive else None, + versions=self.show_versions) + absolute_files = [self._to_storage_file(blob) for blob in blobs_iterator] + absolute_folders = [self._to_storage_folder(name) for name in blobs_iterator.prefixes] + absolute_versions = absolute_files if not self.show_versions \ + else self._group_files_to_versions(absolute_files, absolute_folders, page_size, show_all) + absolute_items = absolute_folders + absolute_versions + requested_items = absolute_items if recursive else [self._to_local_item(item, prefix) + for item in absolute_items] + return requested_items if show_all or not page_size else requested_items[:page_size] + + def _to_storage_file(self, blob): + item = DataStorageItemModel() + item.name = blob.name + item.path = item.name + item.type = 'File' + item.changed = self._to_local_timezone(blob.updated) + item.size = blob.size + item.labels = [DataStorageItemLabelModel('StorageClass', blob.storage_class)] + item.version = blob.generation + item.deleted = self._to_local_timezone(blob.time_deleted) if blob.time_deleted else None + return item + + def _to_local_timezone(self, utc_datetime): + return utc_datetime.astimezone(Config.instance().timezone()) + + def _to_storage_folder(self, name): + item = DataStorageItemModel() + item.name = name + item.path = item.name + item.type = 'Folder' + return item + + def _group_files_to_versions(self, absolute_files, absolute_folders, page_size, show_all): + page_size = page_size - len(absolute_folders) if page_size and not show_all else None + names = set(file.name for file in absolute_files) + absolute_versions = [] + number_of_versions = 0 + for name in names: + files = [file for file in absolute_files if file.name == name] + files.reverse() + latest_file = files[0] + latest_file.latest = not latest_file.deleted + latest_file.versions = files + number_of_versions += len(latest_file.versions) + if latest_file.deleted: + # Because additional synthetic delete version will be shown to user it should be counted in the number + # of file versions. + number_of_versions += 1 + if page_size and number_of_versions > page_size: + number_of_extra_versions = number_of_versions - page_size + latest_file.versions = latest_file.versions[:-number_of_extra_versions] + if latest_file.versions or latest_file.deleted: + absolute_versions.append(latest_file) + break + absolute_versions.append(latest_file) + return absolute_versions + + def _to_local_item(self, absolute_item, prefix): + relative_item = copy.deepcopy(absolute_item) + relative_item.name = StorageOperations.get_item_name(relative_item.name, prefix) + relative_item.path = relative_item.name + return relative_item + + def get_file_tags(self, relative_path): + bucket = self.client.bucket(self.bucket.path) + blob = bucket.blob(relative_path) + blob.reload() + return blob.metadata or {} + + +class GsDeleteManager(GsManager, AbstractDeleteManager): + + def __init__(self, client, bucket): + super(GsDeleteManager, self).__init__(client) + self.bucket = bucket + self.delimiter = StorageOperations.PATH_SEPARATOR + self.listing_manager = GsListingManager(self.client, self.bucket) + + def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False): + if recursive and version: + raise RuntimeError('Recursive folder deletion with specified version is not available ' + 'for GCP cloud provider.') + prefix = StorageOperations.get_prefix(relative_path) + check_file = True + if prefix.endswith(self.delimiter): + prefix = prefix[:-1] + check_file = False + bucket = self.client.bucket(self.bucket.path) + if not recursive and not hard_delete: + self._delete_blob(self._blob(bucket, prefix, version), exclude, include) + else: + blobs_for_deletion = [] + self.listing_manager.show_versions = version is not None or hard_delete + for item in self.listing_manager.list_items(prefix, recursive=True, show_all=True): + if item.name == prefix and check_file: + if version: + matching_item_versions = [item_version for item_version in item.versions + if item_version.version == version] + if matching_item_versions: + blobs_for_deletion = [self._blob(bucket, item.name, matching_item_versions[0].version)] + else: + blobs_for_deletion.extend(self._item_blobs_for_deletion(bucket, item, hard_delete)) + break + if self._file_under_folder(item.name, prefix): + blobs_for_deletion.extend(self._item_blobs_for_deletion(bucket, item, hard_delete)) + for blob in blobs_for_deletion: + self._delete_blob(blob, exclude, include, prefix) + + def _item_blobs_for_deletion(self, bucket, item, hard_delete): + if hard_delete: + return [self._blob(bucket, item.name, item_version.version) for item_version in item.versions] + else: + return [bucket.blob(item.name)] + + def _blob(self, bucket, blob_name, generation): + """ + Returns blob instance with the specified name and generation. + + The current method is a workaround for the absence of support for the operation in the official SDK. + The support for such an operation was requested implemented in #7444 pull request that is + already merged. Therefore, as long as google-cloud-storage==1.15.0 is released the usage of the current + method should be replaced with the usage of a corresponding SDK method. + """ + blob = bucket.blob(blob_name) + if generation: + blob._patch_property('generation', int(generation)) + return blob + + def _delete_blob(self, blob, exclude, include, prefix=None): + if self._is_matching_delete_filters(blob.name, exclude, include, prefix): + self.client._delete_blob_generation(blob) + + def _is_matching_delete_filters(self, blob_name, exclude, include, prefix=None): + if prefix: + relative_file_name = StorageOperations.get_item_name(blob_name, prefix=prefix + self.delimiter) + file_name = StorageOperations.get_prefix(relative_file_name) + else: + file_name = blob_name + return PatternMatcher.match_any(file_name, include) \ + and not PatternMatcher.match_any(file_name, exclude, default=False) + + def _file_under_folder(self, file_path, folder_path): + return StorageOperations.without_prefix(file_path, folder_path).startswith(self.delimiter) + + +class GsRestoreManager(GsManager, AbstractRestoreManager): + + def __init__(self, client, wrapper): + super(GsRestoreManager, self).__init__(client) + self.wrapper = wrapper + self.listing_manager = GsListingManager(self.client, self.wrapper.bucket, show_versions=True) + + def restore_version(self, version): + bucket = self.client.bucket(self.wrapper.bucket.path) + if version: + blob = bucket.blob(self.wrapper.path) + all_items = self.listing_manager.list_items(blob.name, show_all=True) + file_items = [item for item in all_items if item.name == blob.name] + if not file_items: + raise RuntimeError('Version "%s" doesn\'t exist.' % version) + item = file_items[0] + try: + version = int(version) + except ValueError: + raise RuntimeError('Version "%s" doesn\'t exist.' % version) + if not any(item.version == version for item in item.versions): + raise RuntimeError('Version "%s" doesn\'t exist.' % version) + if not item.deleted and item.version == version: + raise RuntimeError('Version "%s" is already the latest version.' % version) + bucket.copy_blob(blob, bucket, blob.name, source_generation=int(version)) + else: + all_items = self.listing_manager.list_items(self.wrapper.path, show_all=True, recursive=True) + file_items = [item for item in all_items if item.name == self.wrapper.path] + if file_items: + item = file_items[0] + if not item.deleted: + raise RuntimeError('Latest file version is not deleted. Please specify "--version" parameter.') + self._restore_latest_archived_version(bucket, item) + else: + for item in all_items: + if item.deleted: + self._restore_latest_archived_version(bucket, item) + + def _restore_latest_archived_version(self, bucket, item): + blob = bucket.blob(item.name) + latest_version = item.version + bucket.copy_blob(blob, bucket, blob.name, source_generation=int(latest_version)) + + +class TransferBetweenGsBucketsManager(GsManager, AbstractTransferManager): + + def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, + size=None, tags=(), skip_existing=False): + full_path = path + destination_path = StorageOperations.normalize_path(destination_wrapper, relative_path) + if skip_existing: + from_size = source_wrapper.get_list_manager().get_file_size(full_path) + to_size = destination_wrapper.get_list_manager().get_file_size(destination_path) + if to_size is not None and to_size == from_size: + if not quiet: + click.echo('Skipping file %s since it exists in the destination %s' + % (full_path, destination_path)) + return + source_bucket = self.client.bucket(source_wrapper.bucket.path) + source_blob = source_bucket.blob(full_path) + destination_bucket = self.client.bucket(destination_wrapper.bucket.path) + progress_callback = GsProgressPercentage.callback(full_path, size, quiet) + source_bucket.copy_blob(source_blob, destination_bucket, destination_path) + destination_blob = destination_bucket.blob(destination_path) + destination_blob.metadata = self._destination_tags(source_wrapper, full_path, tags) + destination_blob.patch() + progress_callback(size) + if clean: + source_blob.delete() + + def _destination_tags(self, source_wrapper, full_path, raw_tags): + tags = StorageOperations.parse_tags(raw_tags) if raw_tags \ + else source_wrapper.get_list_manager().get_file_tags(full_path) + tags.update(StorageOperations.source_tags(tags, full_path, source_wrapper)) + return tags + + +class GsDownloadManager(GsManager, AbstractTransferManager): + + def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, + size=None, tags=(), skip_existing=False): + if path: + source_key = path + else: + source_key = source_wrapper.path + if destination_wrapper.path.endswith(os.path.sep): + destination_key = os.path.join(destination_wrapper.path, relative_path) + else: + destination_key = destination_wrapper.path + if skip_existing: + remote_size = source_wrapper.get_list_manager().get_file_size(source_key) + local_size = StorageOperations.get_local_file_size(destination_key) + if local_size is not None and remote_size == local_size: + if not quiet: + click.echo('Skipping file %s since it exists in the destination %s' % (source_key, destination_key)) + return + folder = os.path.dirname(destination_key) + if folder and not os.path.exists(folder): + os.makedirs(folder) + bucket = self.client.bucket(source_wrapper.bucket.path) + blob = bucket.blob(source_key) + progress_callback = GsProgressPercentage.callback(source_key, size, quiet) + blob.download_to_filename(destination_key) + progress_callback(size) + if clean: + blob.delete() + + +class GsUploadManager(GsManager, AbstractTransferManager): + + def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, + size=None, tags=(), skip_existing=False): + if path: + source_key = os.path.join(source_wrapper.path, path) + else: + source_key = source_wrapper.path + destination_key = StorageOperations.normalize_path(destination_wrapper, relative_path) + if skip_existing: + local_size = StorageOperations.get_local_file_size(source_key) + remote_size = destination_wrapper.get_list_manager().get_file_size(destination_key) + if remote_size is not None and local_size == remote_size: + if not quiet: + click.echo('Skipping file %s since it exists in the destination %s' % (source_key, destination_key)) + return + progress_callback = GsProgressPercentage.callback(relative_path, size, quiet) + bucket = self.client.bucket(destination_wrapper.bucket.path) + blob = bucket.blob(destination_key) + blob.metadata = StorageOperations.generate_tags(tags, source_key) + blob.upload_from_filename(source_key) + progress_callback(size) + if clean: + source_wrapper.delete_item(source_key) + + +class _SourceUrlIO: + + def __init__(self, response): + self.response = response + self.read_bytes_number = 0 + + def tell(self): + return self.read_bytes_number + + def read(self, *args, **kwargs): + new_bytes = self.response.read(*args, **kwargs) + self.read_bytes_number += len(new_bytes) + return new_bytes + + +class TransferFromHttpOrFtpToGsManager(GsManager, AbstractTransferManager): + + def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, + size=None, tags=(), skip_existing=False): + if clean: + raise AttributeError('Cannot perform \'mv\' operation due to deletion remote files ' + 'is not supported for ftp/http sources.') + if path: + source_key = path + else: + source_key = source_wrapper.path + if destination_wrapper.path.endswith(os.path.sep): + destination_key = os.path.join(destination_wrapper.path, relative_path) + else: + destination_key = destination_wrapper.path + if skip_existing: + source_size = size + destination_size = destination_wrapper.get_list_manager().get_file_size(destination_key) + if destination_size is not None and source_size == destination_size: + if not quiet: + click.echo('Skipping file %s since it exists in the destination %s' % (source_key, destination_key)) + return + progress_callback = GsProgressPercentage.callback(relative_path, size, quiet) + bucket = self.client.bucket(destination_wrapper.bucket.path) + blob = bucket.blob(destination_key) + blob.metadata = StorageOperations.generate_tags(tags, source_key) + blob.upload_from_file(_SourceUrlIO(urlopen(source_key))) + progress_callback(blob.size) + + +class GsTemporaryCredentials: + GS_PROJECT = 'GS_PROJECT' + GS_STS_PROJECT = 'GS_STS_TOKEN' + + @classmethod + def from_environment(cls, bucket, read, write): + credentials = TemporaryCredentialsModel() + credentials.secret_key = os.getenv(GsTemporaryCredentials.GS_PROJECT) + credentials.session_token = os.getenv(GsTemporaryCredentials.GS_STS_PROJECT) + credentials.expiration = datetime.utcnow() + timedelta(hours=1) + return credentials + + @classmethod + def from_cp_api(cls, bucket, read, write): + return DataStorage.get_single_temporary_credentials(bucket=bucket.identifier, read=read, write=write) + + +class _RefreshingCredentials(Credentials): + + def __init__(self, refresh): + self._refresh = refresh + self.temporary_credentials = self._refresh() + super(_RefreshingCredentials, self).__init__(self.temporary_credentials.session_token) + + def refresh(self, request): + self.temporary_credentials = self._refresh() + + def apply(self, headers, token=None): + headers['authorization'] = 'Bearer {}'.format(_helpers.from_bytes(self.temporary_credentials.session_token)) + + +class _ProxySession(AuthorizedSession): + + def request(self, method, url, data=None, headers=None, **kwargs): + parsed_url = urlparse(url) + request_url = '%s://%s' % (parsed_url.scheme, parsed_url.netloc) + self.proxies = StorageOperations.get_proxy_config(request_url) + return super(_ProxySession, self).request(method, url, data, headers, **kwargs) + + +class _DeleteBlobGenerationMixin: + + def _delete_blob_generation(self, blob): + """ + Deletes a specific blob generation. + + If the given blob has generation then it will be deleted, otherwise the latest blob generation will. + + The current method is a workaround for the absence of support for the operation in the official SDK. + The support for such an operation was requested in #5781 issue and implemented in #7444 pull request that is + already merged. Therefore, as long as google-cloud-storage==1.15.0 is released the usage of the current + method should be replaced with the usage of a corresponding SDK method. + + See also: + https://github.com/googleapis/google-cloud-python/issues/5781 + https://github.com/googleapis/google-cloud-python/pull/7444 + """ + bucket = blob.bucket + query_params = {} + + if bucket.user_project is not None: + query_params["userProject"] = bucket.user_project + if blob.generation: + query_params['generation'] = blob.generation + + blob_path = Blob.path_helper(bucket.path, blob.name) + bucket.client._connection.api_request( + method="DELETE", + path=blob_path, + query_params=query_params, + _target_object=None, + ) + + +class _RefreshingClient(Client, _DeleteBlobGenerationMixin): + MAX_REFRESH_ATTEMPTS = 100 + + def __init__(self, bucket, read, write, refresh_credentials): + credentials = _RefreshingCredentials(refresh=lambda: refresh_credentials(bucket, read, write)) + session = _ProxySession(credentials, max_refresh_attempts=self.MAX_REFRESH_ATTEMPTS) + super(_RefreshingClient, self).__init__(project=credentials.temporary_credentials.secret_key, _http=session) + + +class GsBucketOperations: + + @classmethod + def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, command): + client = GsBucketOperations.get_client(destination_wrapper.bucket, read=True, write=True) + return TransferBetweenGsBucketsManager(client) + + @classmethod + def get_download_manager(cls, source_wrapper, destination_wrapper, command): + client = GsBucketOperations.get_client(source_wrapper.bucket, read=True, write=command == 'mv') + return GsDownloadManager(client) + + @classmethod + def get_upload_manager(cls, source_wrapper, destination_wrapper, command): + client = GsBucketOperations.get_client(destination_wrapper.bucket, read=True, write=True) + return GsUploadManager(client) + + @classmethod + def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, command): + client = GsBucketOperations.get_client(destination_wrapper.bucket, read=True, write=True) + return TransferFromHttpOrFtpToGsManager(client) + + @classmethod + def get_client(cls, *args, **kwargs): + return _RefreshingClient(*args, refresh_credentials=GsTemporaryCredentials.from_cp_api, **kwargs) diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index ca35ffe6ea..112116d4dd 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -500,6 +500,12 @@ def get_folder_object(self, name): item.path = name return item + def get_items(self, relative_path): + return S3BucketOperations.get_items(self.bucket, session=self.session) + + def get_file_tags(self, relative_path): + return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager(self.session, self.bucket), relative_path) + class ObjectTaggingManager(StorageItemManager):