Skip to content
This repository has been archived by the owner on Nov 8, 2019. It is now read-only.

Commit

Permalink
GCP: pipe cli progress bar (epam#253)
Browse files Browse the repository at this point in the history
* refactor: Migrate to google-cloud-storage==1.15.0 and remove workarounds

* feat: Add support for chunked upload / download operations.

Brings support for chunked upload / download operations introducing a
workaround for the official sdk limitations.

Chunked upload / download allows observing an operation status. The
progress bar is adapted to show all intermediate changes while
uploading or downloading a blob.
  • Loading branch information
tcibinan authored and mzueva committed Jun 4, 2019
1 parent cf357d9 commit 74d3743
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 59 deletions.
2 changes: 1 addition & 1 deletion pipe-cli/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ PyJWT==1.6.1
pypac==0.8.1
beautifulsoup4==4.6.1
azure-storage-blob==1.5.0
google-cloud-storage==1.14.0
google-cloud-storage==1.15.0
pyasn1-modules==0.2.4
pyasn1==0.4.5
setuptools
136 changes: 78 additions & 58 deletions pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import os
from datetime import datetime, timedelta
from google.resumable_media.requests import Download, ChunkedDownload

try:
from urllib.parse import urlparse # Python 3
Expand Down Expand Up @@ -49,6 +50,19 @@ class GsManager:
def __init__(self, client):
self.client = client

def _chunked_blob(self, bucket, blob_name, progress_callback):
"""
The method is a workaround for the absence of support for the uploading and downloading callbacks
in the official SDK.
See _ChunkedBlob documentation for more info.
"""
return _ChunkedBlob(
name=blob_name,
bucket=bucket,
progress_callback=progress_callback
)


class GsListingManager(GsManager, AbstractListingManager):

Expand Down Expand Up @@ -152,7 +166,7 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v
check_file = False
bucket = self.client.bucket(self.bucket.path)
if not recursive and not hard_delete:
self._delete_blob(self._blob(bucket, prefix, version), exclude, include)
self._delete_blob(bucket.blob(prefix, generation=version), exclude, include)
else:
blobs_for_deletion = []
listing_manager = self._get_listing_manager(show_versions=version is not None or hard_delete)
Expand All @@ -162,7 +176,7 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v
matching_item_versions = [item_version for item_version in item.versions
if item_version.version == version]
if matching_item_versions:
blobs_for_deletion = [self._blob(bucket, item.name, matching_item_versions[0].version)]
blobs_for_deletion = [bucket.blob(item.name, generation=matching_item_versions[0].version)]
else:
blobs_for_deletion.extend(self._item_blobs_for_deletion(bucket, item, hard_delete))
break
Expand All @@ -173,27 +187,13 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v

def _item_blobs_for_deletion(self, bucket, item, hard_delete):
if hard_delete:
return [self._blob(bucket, item.name, item_version.version) for item_version in item.versions]
return [bucket.blob(item.name, generation=item_version.version) for item_version in item.versions]
else:
return [bucket.blob(item.name)]

def _blob(self, bucket, blob_name, generation):
"""
Returns blob instance with the specified name and generation.
The current method is a workaround for the absence of support for the operation in the official SDK.
The support for such an operation was requested implemented in #7444 pull request that is
already merged. Therefore, as long as google-cloud-storage==1.15.0 is released the usage of the current
method should be replaced with the usage of a corresponding SDK method.
"""
blob = bucket.blob(blob_name)
if generation:
blob._patch_property('generation', int(generation))
return blob

def _delete_blob(self, blob, exclude, include, prefix=None):
if self._is_matching_delete_filters(blob.name, exclude, include, prefix):
self.client._delete_blob_generation(blob)
blob.delete()

def _is_matching_delete_filters(self, blob_name, exclude, include, prefix=None):
if prefix:
Expand Down Expand Up @@ -276,11 +276,13 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
source_bucket = source_client.bucket(source_wrapper.bucket.path)
source_blob = source_bucket.blob(full_path)
destination_bucket = self.client.bucket(destination_wrapper.bucket.path)
progress_callback = GsProgressPercentage.callback(full_path, size, quiet)
source_bucket.copy_blob(source_blob, destination_bucket, destination_path, client=self.client)
destination_blob = destination_bucket.blob(destination_path)
destination_blob.metadata = self._destination_tags(source_wrapper, full_path, tags)
destination_blob.patch()
# Transfer between buckets in GCP is almost an instant operation. Therefore the progress bar can be updated
# only once.
progress_callback = GsProgressPercentage.callback(full_path, size, quiet)
if progress_callback is not None:
progress_callback(size)
if clean:
Expand All @@ -293,6 +295,58 @@ def _destination_tags(self, source_wrapper, full_path, raw_tags):
return tags


class _ChunkedBlob(Blob):
DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB

def __init__(self, progress_callback=None, *args, **kwargs):
"""
The class is a workaround for the absence of support for the uploading and downloading callbacks
in the official SDK. There is no issues for support of such a feature. Nevertheless if the support for
uploading and downloading callbacks will be provided the usages of _ChunkedBlob class should be removed.
"""
super(_ChunkedBlob, self).__init__(chunk_size=_ChunkedBlob.DEFAULT_CHUNK_SIZE, *args, **kwargs)
self.progress_callback = progress_callback

def _do_download(self, transport, file_obj, download_url, headers, start=None, end=None):
if self.chunk_size is None:
download = Download(
download_url, stream=file_obj, headers=headers, start=start, end=end
)
download.consume(transport)
else:
download = ChunkedDownload(
download_url,
self.chunk_size,
file_obj,
headers=headers,
start=start if start else 0,
end=end,
)

while not download.finished:
if self.progress_callback:
self.progress_callback(download.bytes_downloaded)
download.consume_next_chunk(transport)

def _do_resumable_upload(self, client, stream, content_type, size, num_retries, predefined_acl):
upload, transport = self._initiate_resumable_upload(
client,
stream,
content_type,
size,
num_retries,
predefined_acl=predefined_acl,
chunk_size=self.chunk_size
)

while not upload.finished:
if self.progress_callback:
self.progress_callback(upload._bytes_uploaded)
response = upload.transmit_next_chunk(transport)

return response


class GsDownloadManager(GsManager, AbstractTransferManager):

def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False,
Expand All @@ -315,9 +369,9 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
folder = os.path.dirname(destination_key)
if folder and not os.path.exists(folder):
os.makedirs(folder)
bucket = self.client.bucket(source_wrapper.bucket.path)
blob = bucket.blob(source_key)
progress_callback = GsProgressPercentage.callback(source_key, size, quiet)
bucket = self.client.bucket(source_wrapper.bucket.path)
blob = self._chunked_blob(bucket, source_key, progress_callback)
blob.download_to_filename(destination_key)
if progress_callback is not None:
progress_callback(size)
Expand All @@ -343,7 +397,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
return
progress_callback = GsProgressPercentage.callback(relative_path, size, quiet)
bucket = self.client.bucket(destination_wrapper.bucket.path)
blob = bucket.blob(destination_key)
blob = self._chunked_blob(bucket, destination_key, progress_callback)
blob.metadata = StorageOperations.generate_tags(tags, source_key)
blob.upload_from_filename(source_key)
if progress_callback is not None:
Expand Down Expand Up @@ -391,7 +445,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
return
progress_callback = GsProgressPercentage.callback(relative_path, size, quiet)
bucket = self.client.bucket(destination_wrapper.bucket.path)
blob = bucket.blob(destination_key)
blob = self._chunked_blob(bucket, destination_key, progress_callback)
blob.metadata = StorageOperations.generate_tags(tags, source_key)
blob.upload_from_file(_SourceUrlIO(urlopen(source_key)))
if progress_callback is not None:
Expand Down Expand Up @@ -439,41 +493,7 @@ def request(self, method, url, data=None, headers=None, **kwargs):
return super(_ProxySession, self).request(method, url, data, headers, **kwargs)


class _DeleteBlobGenerationMixin:

def _delete_blob_generation(self, blob):
"""
Deletes a specific blob generation.
If the given blob has generation then it will be deleted, otherwise the latest blob generation will.
The current method is a workaround for the absence of support for the operation in the official SDK.
The support for such an operation was requested in #5781 issue and implemented in #7444 pull request that is
already merged. Therefore, as long as google-cloud-storage==1.15.0 is released the usage of the current
method should be replaced with the usage of a corresponding SDK method.
See also:
https://github.com/googleapis/google-cloud-python/issues/5781
https://github.com/googleapis/google-cloud-python/pull/7444
"""
bucket = blob.bucket
query_params = {}

if bucket.user_project is not None:
query_params["userProject"] = bucket.user_project
if blob.generation:
query_params['generation'] = blob.generation

blob_path = Blob.path_helper(bucket.path, blob.name)
bucket.client._connection.api_request(
method="DELETE",
path=blob_path,
query_params=query_params,
_target_object=None,
)


class _RefreshingClient(Client, _DeleteBlobGenerationMixin):
class _RefreshingClient(Client):
MAX_REFRESH_ATTEMPTS = 100

def __init__(self, bucket, read, write, refresh_credentials, versioning=False):
Expand Down

0 comments on commit 74d3743

Please sign in to comment.