Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GCP in pipe cli #57

Merged
merged 28 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1b5bffc
feat: Add google cloud storage support backbone
tcibinan Mar 25, 2019
1037aa4
feat: Add basic GS manager implementations
tcibinan Mar 25, 2019
3a2514d
feat: Add support for exclude and include blob filters on deletion
tcibinan Mar 26, 2019
7191d75
feat: Add basic support for versioning operations
tcibinan Mar 26, 2019
e438d06
feat: Add wrapper initialization and proxy configuration
tcibinan Mar 26, 2019
117e66d
fix: Add missing comma in setup.py
tcibinan Mar 26, 2019
e8b72b7
feat: Add support for folders listing
tcibinan Mar 26, 2019
ebb8629
fix: Add missing argument in storage wrapper init
tcibinan Mar 26, 2019
3f045db
fix: Repair copying between local and GS storages
tcibinan Mar 26, 2019
c318058
fix: Repair copying between GS storages
tcibinan Mar 27, 2019
01aa8ce
fix: Add basic support for uploading files from url to GS storage
tcibinan Mar 27, 2019
6fd3467
feat: Add support for uploading files streaming from url to GS storage
tcibinan Mar 27, 2019
bf8c816
feat: Add support for blob versions listing and deletion
tcibinan Mar 27, 2019
078433c
refactor: Simplify blob versions recursive deletion
tcibinan Mar 27, 2019
d3eee64
fix: Repair restoring for GS storages
tcibinan Mar 29, 2019
5fa5116
fix: Replace gc:// prefix with canonical gs:// one for GS storage
tcibinan Mar 29, 2019
68796e6
fix: Update file versions listing behaviour
tcibinan Mar 29, 2019
467de5f
fix: Show versions in reverse order and mark the latest one
tcibinan Mar 29, 2019
cbb80bf
fix: Add missing method implementation to S3 listing manager
tcibinan Mar 29, 2019
ec07928
fix: Repair version-aware operation for GCP storages
tcibinan Apr 1, 2019
6bf9155
fix: Repair restore latest version command for GCP storages
tcibinan Apr 2, 2019
7079db0
fix: Use temporary token retrieved from server
tcibinan Apr 2, 2019
75eb697
fix: Add missing google-cloud-storage dependency to pipe-cli requirem…
tcibinan Apr 3, 2019
06066a3
chore: Add requests package hidden import.
tcibinan Apr 3, 2019
8418bae
chore: Replace requests package hidden import with pyinstaller hook.
tcibinan Apr 3, 2019
f312e26
fix: Replace all eager bucket object usages with lazy ones
tcibinan Apr 3, 2019
02f4527
fix: Simplify delete blob generation method
tcibinan Apr 3, 2019
3ea1a8c
fix: Use default region while creating storages without region id spe…
tcibinan Apr 3, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pipe-cli/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
'PyJWT==1.6.1',
'pypac==0.8.1',
'beautifulsoup4==4.6.1',
'azure-storage-blob==1.5.0'
'azure-storage-blob==1.5.0',
'google-cloud-storage==1.14.0'
],
entry_points='''
[console_scripts]
Expand Down
2 changes: 0 additions & 2 deletions pipe-cli/src/api/data_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
install_aliases()

from urllib.parse import urlparse, urlencode
from urllib.request import urlopen, Request
from urllib.error import HTTPError

import json
from src.model.data_storage_tmp_credentials_model import TemporaryCredentialsModel
Expand Down
1 change: 1 addition & 0 deletions pipe-cli/src/model/data_storage_item_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self):
self.versions = []
self.latest = False
self.delete_marker = False
self.deleted = None

@classmethod
def load(cls, json):
Expand Down
78 changes: 56 additions & 22 deletions pipe-cli/src/model/data_storage_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ..utilities.storage.s3 import S3BucketOperations
from ..utilities.storage.local import LocalOperations
from ..utilities.storage.azure import AzureListingManager, AzureDeleteManager, AzureBucketOperations
from ..utilities.storage.gs import GsRestoreManager, GsListingManager, GsDeleteManager, GsBucketOperations
from ..utilities.storage.common import StorageOperations
from .data_storage_wrapper_type import WrapperType
import shutil
Expand All @@ -55,6 +56,12 @@ class DataStorageWrapper(object):
(WrapperType.FTP, WrapperType.AZURE): AzureBucketOperations.get_transfer_from_http_or_ftp_manager,
(WrapperType.HTTP, WrapperType.AZURE): AzureBucketOperations.get_transfer_from_http_or_ftp_manager,

(WrapperType.GS, WrapperType.GS): GsBucketOperations.get_transfer_between_buckets_manager,
(WrapperType.GS, WrapperType.LOCAL): GsBucketOperations.get_download_manager,
(WrapperType.LOCAL, WrapperType.GS): GsBucketOperations.get_upload_manager,
(WrapperType.FTP, WrapperType.GS): GsBucketOperations.get_transfer_from_http_or_ftp_manager,
(WrapperType.HTTP, WrapperType.GS): GsBucketOperations.get_transfer_from_http_or_ftp_manager,

(WrapperType.FTP, WrapperType.LOCAL): LocalOperations.get_transfer_from_http_or_ftp_manager,
(WrapperType.HTTP, WrapperType.LOCAL): LocalOperations.get_transfer_from_http_or_ftp_manager
}
Expand Down Expand Up @@ -90,7 +97,8 @@ def get_cloud_wrapper_for_bucket(cls, bucket_model, relative_path):
def __get_storage_wrapper(cls, bucket, relative_path, *args, **kwargs):
_suppliers = {
WrapperType.S3: S3BucketWrapper.build_wrapper,
WrapperType.AZURE: AzureBucketWrapper.build_wrapper
WrapperType.AZURE: AzureBucketWrapper.build_wrapper,
WrapperType.GS: GsBucketWrapper.build_wrapper,
}
if bucket.type in _suppliers:
supplier = _suppliers[bucket.type]
Expand Down Expand Up @@ -214,12 +222,31 @@ def is_file(self):
def exists(self):
return self.exists_flag

def get_items(self):
return self.get_list_manager().get_items(self.path)

def is_empty(self, relative=None):
if not self.exists():
return True
if self.is_file():
return False
if relative:
delimiter = StorageOperations.PATH_SEPARATOR
path = self.path.rstrip(delimiter) + delimiter + relative
else:
path = self.path
return not self.get_list_manager().folder_exists(path)

@abstractmethod
def get_type(self):
pass

@abstractmethod
def get_restore_manager(self):
pass

@abstractmethod
def get_list_manager(self, show_versions):
def get_list_manager(self, show_versions=False):
pass

@abstractmethod
Expand Down Expand Up @@ -253,9 +280,6 @@ def is_empty(self, relative=None):
return not S3BucketOperations.path_exists(self, relative, session=self.session)
return self.is_empty_flag

def get_items(self):
return S3BucketOperations.get_items(self, session=self.session)

def get_file_download_uri(self, relative_path):
download_url_model = None
try:
Expand All @@ -278,7 +302,7 @@ def delete_item(self, relative_path):
def get_restore_manager(self):
return S3BucketOperations.get_restore_manager(self)

def get_list_manager(self, show_versions):
def get_list_manager(self, show_versions=False):
return S3BucketOperations.get_list_manager(self, show_versions=show_versions)

def get_delete_manager(self, versioning):
Expand All @@ -297,27 +321,12 @@ def build_wrapper(cls, root_bucket, relative_path, versioning=False, init=True):
raise RuntimeError('Versioning is not supported by AZURE cloud provider')
wrapper = AzureBucketWrapper(root_bucket, relative_path)
if init:
AzureBucketOperations.init_wrapper(wrapper)
StorageOperations.init_wrapper(wrapper, versioning=versioning)
return wrapper

def get_type(self):
return WrapperType.AZURE

def is_empty(self, relative=None):
if not self.exists():
return True
if self.is_file():
return False
if relative:
delimiter = StorageOperations.PATH_SEPARATOR
path = self.path.rstrip(delimiter) + delimiter + relative
else:
path = self.path
return not self.get_list_manager().folder_exists(path)

def get_items(self):
return self.get_list_manager().get_items(self.path)

def get_restore_manager(self):
raise RuntimeError('Versioning is not supported by AZURE cloud provider')

Expand All @@ -337,6 +346,31 @@ def _blob_service(self, read, write):
return self.service


class GsBucketWrapper(CloudDataStorageWrapper):

@classmethod
def build_wrapper(cls, root_bucket, relative_path, init=True, *args, **kwargs):
wrapper = GsBucketWrapper(root_bucket, relative_path)
if init:
StorageOperations.init_wrapper(wrapper, *args, **kwargs)
return wrapper

def get_type(self):
return WrapperType.GS

def get_restore_manager(self):
return GsRestoreManager(self._storage_client(write=True), self)

def get_list_manager(self, show_versions=False):
return GsListingManager(self._storage_client(), self.bucket, show_versions)

def get_delete_manager(self, versioning):
return GsDeleteManager(self._storage_client(write=True), self.bucket)

def _storage_client(self, read=True, write=False):
return GsBucketOperations.get_client(self.bucket, read=read, write=write)


class LocalFileSystemWrapper(DataStorageWrapper):

def __init__(self, path):
Expand Down
8 changes: 5 additions & 3 deletions pipe-cli/src/model/data_storage_wrapper_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ class WrapperType(object):
LOCAL = 'LOCAL'
S3 = 'S3'
AZURE = 'AZ'
GS = 'GS'
FTP = 'FTP'
HTTP = 'HTTP'

__cloud_types = [S3, AZURE]
__cloud_types = [S3, AZURE, GS]
__dynamic_cloud_scheme = 'cp'
__s3_cloud_scheme = 's3'
__azure_cloud_scheme = 'az'
__cloud_schemes = [__dynamic_cloud_scheme, __s3_cloud_scheme, __azure_cloud_scheme]
__cloud_schemes_map = {S3: __s3_cloud_scheme, AZURE: __azure_cloud_scheme}
__gs_cloud_scheme = 'gs'
__cloud_schemes = [__dynamic_cloud_scheme, __s3_cloud_scheme, __azure_cloud_scheme, __gs_cloud_scheme]
__cloud_schemes_map = {S3: __s3_cloud_scheme, AZURE: __azure_cloud_scheme, GS: __gs_cloud_scheme}

@classmethod
def cloud_types(cls):
Expand Down
19 changes: 12 additions & 7 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def storage_remove_item(cls, path, yes, version, hard_delete, recursive, exclude
if version and hard_delete:
click.echo('"version" argument should\'t be combined with "hard-delete" option', err=True)
sys.exit(1)
source_wrapper = DataStorageWrapper.get_cloud_wrapper(path, versioning=version or hard_delete)
source_wrapper = DataStorageWrapper.get_cloud_wrapper(path, versioning=version is not None or hard_delete)
if source_wrapper is None or not source_wrapper.exists():
click.echo('Storage path "{}" was not found'.format(path), err=True)
sys.exit(1)
Expand Down Expand Up @@ -384,23 +384,28 @@ def __print_data_storage_contents(cls, bucket_model, relative_path,
labels = ''
if item.type is not None and item.type in WrapperType.cloud_types():
name = item.path
if item.changed is not None:
item_updated = item.deleted or item.changed
if item_updated is not None:
if bucket_model is None:
# need to wrap into datetime since bucket listing returns str
item_datetime = datetime.datetime.strptime(item.changed, '%Y-%m-%d %H:%M:%S')
item_datetime = datetime.datetime.strptime(item_updated, '%Y-%m-%d %H:%M:%S')
else:
item_datetime = item.changed
item_datetime = item_updated
changed = item_datetime.strftime('%Y-%m-%d %H:%M:%S')
if item.size is not None:
if item.size is not None and not item.deleted:
size = item.size
if item.labels is not None and len(item.labels) > 0:
if item.labels is not None and len(item.labels) > 0 and not item.deleted:
labels = ', '.join(map(lambda i: i.value, item.labels))
item_type = "-File" if item.delete_marker else item.type
item_type = "-File" if item.delete_marker or item.deleted else item.type
row = [item_type, labels, changed, size, name]
if show_versions:
row.append('')
items_table.add_row(row)
if show_versions and item.type == 'File':
if item.deleted:
# Additional synthetic delete version
row = ['-File', '', item.deleted.strftime('%Y-%m-%d %H:%M:%S'), size, name, '- (latest)']
items_table.add_row(row)
for version in item.versions:
version_type = "-File" if version.delete_marker else "+File"
version_label = "{} (latest)".format(version.version) if version.latest else version.version
Expand Down
75 changes: 8 additions & 67 deletions pipe-cli/src/utilities/storage/azure.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
from __future__ import absolute_import

import copy
import io
import time
from threading import Lock
from datetime import timedelta, datetime
import os
import click

try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2

from datetime import timedelta, datetime

import io
import os
import click

from azure.storage.blob import BlockBlobService, ContainerPermissions, Blob
from azure.storage.common._auth import _StorageSASAuthentication

Expand Down Expand Up @@ -93,41 +91,9 @@ def _to_local_item(self, absolute_item, prefix):
relative_item.path = relative_item.name
return relative_item

def folder_exists(self, relative_path):
prefix = StorageOperations.get_prefix(relative_path).rstrip(self.delimiter) + self.delimiter
for item in self.list_items(prefix, show_all=True):
if prefix.endswith(item.name):
return True
return False

def get_file_size(self, relative_path):
items = self.list_items(relative_path, show_all=True, recursive=True)
for item in items:
if item.name == relative_path:
return item.size
return None

def get_file_tags(self, relative_path):
return dict(self.service.get_blob_metadata(self.bucket.path, relative_path))

def get_items(self, relative_path):
"""
Returns all files under the given relative path in forms of tuples with the following structure:
('File', full_path, relative_path, size)

:param relative_path: Path to a folder or a file.
:return: Generator of file tuples.
"""
prefix = StorageOperations.get_prefix(relative_path).rstrip(self.delimiter)
for item in self.list_items(prefix, recursive=True, show_all=True):
if not StorageOperations.is_relative_path(item.name, prefix):
continue
if item.name == relative_path:
item_relative_path = os.path.basename(item.name)
else:
item_relative_path = StorageOperations.get_item_name(item.name, prefix + self.delimiter)
yield ('File', item.name, item_relative_path, item.size)


class AzureDeleteManager(AzureManager, AbstractDeleteManager):

Expand Down Expand Up @@ -291,10 +257,10 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
source_wrapper.delete_item(source_key)


class UrlIO(io.BytesIO):
class _SourceUrlIO(io.BytesIO):

def __init__(self, url):
super(UrlIO, self).__init__()
super(_SourceUrlIO, self).__init__()
self.io = urlopen(url)

def read(self, n=10):
Expand Down Expand Up @@ -325,7 +291,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
return
destination_tags = StorageOperations.generate_tags(tags, source_key)
progress_callback = AzureProgressPercentage.callback(relative_path, size, quiet)
self.service.create_blob_from_stream(destination_wrapper.bucket.path, destination_key, UrlIO(source_key),
self.service.create_blob_from_stream(destination_wrapper.bucket.path, destination_key, _SourceUrlIO(source_key),
metadata=destination_tags,
progress_callback=progress_callback)

Expand Down Expand Up @@ -403,35 +369,10 @@ def _apply_host(self, request, operation_context, retry_context):
super(ProxyBlockBlobService, self)._apply_host(request, operation_context, retry_context)

request_url = self.protocol + '://' + request.host
self._httpclient.proxies = AzureBucketOperations.get_proxy_config(request_url)
self._httpclient.proxies = StorageOperations.get_proxy_config(request_url)


class AzureBucketOperations:
__config__ = None

@classmethod
def get_proxy_config(cls, target_url=None):
if cls.__config__ is None:
cls.__config__ = Config.instance()
if cls.__config__.proxy is None:
return None
else:
return cls.__config__.resolve_proxy(target_url=target_url)

@classmethod
def init_wrapper(cls, wrapper):
delimiter = StorageOperations.PATH_SEPARATOR
prefix = StorageOperations.get_prefix(wrapper.path)
check_file = True
if prefix.endswith(delimiter):
prefix = prefix[:-1]
check_file = False
for item in wrapper.get_list_manager().list_items(prefix, show_all=True):
if prefix.endswith(item.name.rstrip(delimiter)) and (check_file or item.type == 'Folder'):
wrapper.exists_flag = True
wrapper.is_file_flag = item.type == 'File'
break
return wrapper

@classmethod
def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, command):
Expand Down
Loading