Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP: pipe cli progress bar #253

Merged
merged 2 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pipe-cli/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ PyJWT==1.6.1
pypac==0.8.1
beautifulsoup4==4.6.1
azure-storage-blob==1.5.0
google-cloud-storage==1.14.0
google-cloud-storage==1.15.0
pyasn1-modules==0.2.4
pyasn1==0.4.5
setuptools
136 changes: 78 additions & 58 deletions pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import os
from datetime import datetime, timedelta
from google.resumable_media.requests import Download, ChunkedDownload

try:
from urllib.parse import urlparse # Python 3
Expand Down Expand Up @@ -49,6 +50,19 @@ class GsManager:
def __init__(self, client):
self.client = client

def _chunked_blob(self, bucket, blob_name, progress_callback):
"""
The method is a workaround for the absence of support for the uploading and downloading callbacks
in the official SDK.

See _ChunkedBlob documentation for more info.
"""
return _ChunkedBlob(
name=blob_name,
bucket=bucket,
progress_callback=progress_callback
)


class GsListingManager(GsManager, AbstractListingManager):

Expand Down Expand Up @@ -152,7 +166,7 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v
check_file = False
bucket = self.client.bucket(self.bucket.path)
if not recursive and not hard_delete:
self._delete_blob(self._blob(bucket, prefix, version), exclude, include)
self._delete_blob(bucket.blob(prefix, generation=version), exclude, include)
else:
blobs_for_deletion = []
listing_manager = self._get_listing_manager(show_versions=version is not None or hard_delete)
Expand All @@ -162,7 +176,7 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v
matching_item_versions = [item_version for item_version in item.versions
if item_version.version == version]
if matching_item_versions:
blobs_for_deletion = [self._blob(bucket, item.name, matching_item_versions[0].version)]
blobs_for_deletion = [bucket.blob(item.name, generation=matching_item_versions[0].version)]
else:
blobs_for_deletion.extend(self._item_blobs_for_deletion(bucket, item, hard_delete))
break
Expand All @@ -173,27 +187,13 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v

def _item_blobs_for_deletion(self, bucket, item, hard_delete):
if hard_delete:
return [self._blob(bucket, item.name, item_version.version) for item_version in item.versions]
return [bucket.blob(item.name, generation=item_version.version) for item_version in item.versions]
else:
return [bucket.blob(item.name)]

def _blob(self, bucket, blob_name, generation):
"""
Returns blob instance with the specified name and generation.

The current method is a workaround for the absence of support for the operation in the official SDK.
The support for such an operation was requested implemented in #7444 pull request that is
already merged. Therefore, as long as google-cloud-storage==1.15.0 is released the usage of the current
method should be replaced with the usage of a corresponding SDK method.
"""
blob = bucket.blob(blob_name)
if generation:
blob._patch_property('generation', int(generation))
return blob

def _delete_blob(self, blob, exclude, include, prefix=None):
if self._is_matching_delete_filters(blob.name, exclude, include, prefix):
self.client._delete_blob_generation(blob)
blob.delete()

def _is_matching_delete_filters(self, blob_name, exclude, include, prefix=None):
if prefix:
Expand Down Expand Up @@ -276,11 +276,13 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
source_bucket = source_client.bucket(source_wrapper.bucket.path)
source_blob = source_bucket.blob(full_path)
destination_bucket = self.client.bucket(destination_wrapper.bucket.path)
progress_callback = GsProgressPercentage.callback(full_path, size, quiet)
source_bucket.copy_blob(source_blob, destination_bucket, destination_path, client=self.client)
destination_blob = destination_bucket.blob(destination_path)
destination_blob.metadata = self._destination_tags(source_wrapper, full_path, tags)
destination_blob.patch()
# Transfer between buckets in GCP is almost an instant operation. Therefore the progress bar can be updated
# only once.
progress_callback = GsProgressPercentage.callback(full_path, size, quiet)
if progress_callback is not None:
progress_callback(size)
if clean:
Expand All @@ -293,6 +295,58 @@ def _destination_tags(self, source_wrapper, full_path, raw_tags):
return tags


class _ChunkedBlob(Blob):
DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB

def __init__(self, progress_callback=None, *args, **kwargs):
"""
The class is a workaround for the absence of support for the uploading and downloading callbacks
in the official SDK. There is no issues for support of such a feature. Nevertheless if the support for
uploading and downloading callbacks will be provided the usages of _ChunkedBlob class should be removed.
"""
super(_ChunkedBlob, self).__init__(chunk_size=_ChunkedBlob.DEFAULT_CHUNK_SIZE, *args, **kwargs)
self.progress_callback = progress_callback

def _do_download(self, transport, file_obj, download_url, headers, start=None, end=None):
if self.chunk_size is None:
download = Download(
download_url, stream=file_obj, headers=headers, start=start, end=end
)
download.consume(transport)
else:
download = ChunkedDownload(
download_url,
self.chunk_size,
file_obj,
headers=headers,
start=start if start else 0,
end=end,
)

while not download.finished:
if self.progress_callback:
self.progress_callback(download.bytes_downloaded)
download.consume_next_chunk(transport)

def _do_resumable_upload(self, client, stream, content_type, size, num_retries, predefined_acl):
upload, transport = self._initiate_resumable_upload(
client,
stream,
content_type,
size,
num_retries,
predefined_acl=predefined_acl,
chunk_size=self.chunk_size
)

while not upload.finished:
if self.progress_callback:
self.progress_callback(upload._bytes_uploaded)
response = upload.transmit_next_chunk(transport)

return response


class GsDownloadManager(GsManager, AbstractTransferManager):

def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False,
Expand All @@ -315,9 +369,9 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
folder = os.path.dirname(destination_key)
if folder and not os.path.exists(folder):
os.makedirs(folder)
bucket = self.client.bucket(source_wrapper.bucket.path)
blob = bucket.blob(source_key)
progress_callback = GsProgressPercentage.callback(source_key, size, quiet)
bucket = self.client.bucket(source_wrapper.bucket.path)
blob = self._chunked_blob(bucket, source_key, progress_callback)
blob.download_to_filename(destination_key)
if progress_callback is not None:
progress_callback(size)
Expand All @@ -343,7 +397,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
return
progress_callback = GsProgressPercentage.callback(relative_path, size, quiet)
bucket = self.client.bucket(destination_wrapper.bucket.path)
blob = bucket.blob(destination_key)
blob = self._chunked_blob(bucket, destination_key, progress_callback)
blob.metadata = StorageOperations.generate_tags(tags, source_key)
blob.upload_from_filename(source_key)
if progress_callback is not None:
Expand Down Expand Up @@ -391,7 +445,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
return
progress_callback = GsProgressPercentage.callback(relative_path, size, quiet)
bucket = self.client.bucket(destination_wrapper.bucket.path)
blob = bucket.blob(destination_key)
blob = self._chunked_blob(bucket, destination_key, progress_callback)
blob.metadata = StorageOperations.generate_tags(tags, source_key)
blob.upload_from_file(_SourceUrlIO(urlopen(source_key)))
if progress_callback is not None:
Expand Down Expand Up @@ -439,41 +493,7 @@ def request(self, method, url, data=None, headers=None, **kwargs):
return super(_ProxySession, self).request(method, url, data, headers, **kwargs)


class _DeleteBlobGenerationMixin:

def _delete_blob_generation(self, blob):
"""
Deletes a specific blob generation.

If the given blob has generation then it will be deleted, otherwise the latest blob generation will.

The current method is a workaround for the absence of support for the operation in the official SDK.
The support for such an operation was requested in #5781 issue and implemented in #7444 pull request that is
already merged. Therefore, as long as google-cloud-storage==1.15.0 is released the usage of the current
method should be replaced with the usage of a corresponding SDK method.

See also:
https://github.com/googleapis/google-cloud-python/issues/5781
https://github.com/googleapis/google-cloud-python/pull/7444
"""
bucket = blob.bucket
query_params = {}

if bucket.user_project is not None:
query_params["userProject"] = bucket.user_project
if blob.generation:
query_params['generation'] = blob.generation

blob_path = Blob.path_helper(bucket.path, blob.name)
bucket.client._connection.api_request(
method="DELETE",
path=blob_path,
query_params=query_params,
_target_object=None,
)


class _RefreshingClient(Client, _DeleteBlobGenerationMixin):
class _RefreshingClient(Client):
MAX_REFRESH_ATTEMPTS = 100

def __init__(self, bucket, read, write, refresh_credentials, versioning=False):
Expand Down