Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP: Support resumable download and custom buffering size in pipe cli #475

Merged
merged 3 commits into from
Jul 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 171 additions & 69 deletions pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import base64
import copy
import hashlib
import os
from datetime import datetime, timedelta
from requests import RequestException

try:
from urllib.parse import urlparse # Python 3
Expand All @@ -14,6 +17,8 @@
from google.auth.transport.requests import AuthorizedSession
from google.cloud.storage import Client, Blob
from google.oauth2.credentials import Credentials
from google.resumable_media import DataCorruption
from google.resumable_media.requests import Download

from src.api.data_storage import DataStorage
from src.config import Config
Expand All @@ -25,6 +30,10 @@
AbstractDeleteManager, AbstractTransferManager


CP_CLI_DOWNLOAD_BUFFERING_SIZE = 'CP_CLI_DOWNLOAD_BUFFERING_SIZE'
CP_CLI_RESUMABLE_DOWNLOAD_ATTEMPTS = 'CP_CLI_RESUMABLE_DOWNLOAD_ATTEMPTS'


class GsProgressPercentage(ProgressPercentage):

def __init__(self, filename, size):
Expand All @@ -44,22 +53,158 @@ def callback(source_key, size, quiet):
return lambda current: progress(current)


class _OutputStreamMixin(object):

def __init__(self, stream):
self._stream = stream
self._bytes_transferred = 0

@property
def bytes_transferred(self):
return self._stream.bytes_transferred if hasattr(self._stream, 'bytes_transferred') else self._bytes_transferred


class _ProgressOutputStream(_OutputStreamMixin):

def __init__(self, stream, progress_callback, progress_chunk_size):
"""
Output stream wrapper that updates writing progress.

Progress callbacks are time-consuming actions. To improve overall performance callbacks have to be called
only a limited number of times. The amount of transferred data to cause a new progress callback is specified
in progress_chunk_size parameter.

:param stream: Wrapping stream.
:param progress_callback: Progress callback.
:param progress_chunk_size: Required amount of transferred data from the previous progress callback
to cause a new callback.
"""
super(_ProgressOutputStream, self).__init__(stream)
self._progress_callback = progress_callback
self._progress_chunk_size = progress_chunk_size
self._progress_chunk = 0

def write(self, data):
self._stream.write(data)
self._bytes_transferred += len(data)
current_chunk = self._bytes_transferred / self._progress_chunk_size
if current_chunk > self._progress_chunk:
self._progress_chunk = current_chunk
if self._progress_callback:
self._progress_callback(self._bytes_transferred)


class _CheckSumOutputStream(_OutputStreamMixin):

def __init__(self, stream):
"""
Output stream wrapper that collects md5 hash for the writing data. Once all the data is written
the resulting checksum can be validated.

:param stream: Wrapping stream.
"""
super(_CheckSumOutputStream, self).__init__(stream)
self._md5_hash = hashlib.md5()

def write(self, data):
self._stream.write(data)
self._md5_hash.update(data)

def validate_checksum(self, expected_md5_hash):
actual_md5_hash = base64.b64encode(self._md5_hash.digest())
actual_md5_hash = actual_md5_hash.decode(u'utf-8')
if actual_md5_hash != expected_md5_hash:
raise RuntimeError('Checksum mismatch after the resuming download.')


class _ResumableDownloadProgressMixin(Blob):
"""
Blob download mixin that checks downloading progress, validates downloading checksum and resumes the download in
case of any low-level error.
"""

def _do_download(self, transport, file_obj, download_url, headers, start=None, end=None):
stream = _CheckSumOutputStream(_ProgressOutputStream(file_obj, progress_callback=self._progress_callback,
progress_chunk_size=self._progress_chunk_size))
remaining_attempts = self._attempts
while stream.bytes_transferred < self._size and remaining_attempts:
try:
download = Download(
download_url, stream=stream, headers=headers,
start=stream.bytes_transferred, end=end
)
download.consume(transport)
except RequestException as e:
remaining_attempts -= 1
if not remaining_attempts:
raise RuntimeError('Resumable download has failed after %s sequential resumes. '
'You can alter the number of allowed resumes using %s environment variable. '
'Original error: %s.'
% (self._attempts, CP_CLI_RESUMABLE_DOWNLOAD_ATTEMPTS, e.message))
self.reload()
stream.validate_checksum(self.md5_hash)


class _UploadProgressMixin(Blob):

def _do_resumable_upload(self, client, stream, content_type, size, num_retries, predefined_acl):
upload, transport = self._initiate_resumable_upload(
client,
stream,
content_type,
size,
num_retries,
predefined_acl=predefined_acl,
chunk_size=self.chunk_size
)

response = None
while not upload.finished:
if self._progress_callback:
self._progress_callback(upload._bytes_uploaded)
response = upload.transmit_next_chunk(transport)

return response


class _ProgressBlob(_ResumableDownloadProgressMixin, _UploadProgressMixin, Blob):
PROGRESS_CHUNK_SIZE = 5 * 1024 * 1024 # 5 MB
DEFAULT_RESUME_ATTEMPTS = 5

def __init__(self, size, progress_callback, attempts=DEFAULT_RESUME_ATTEMPTS, *args, **kwargs):
"""
The class is a workaround for the absence of support for the uploading and downloading callbacks
in the official SDK. There are no issues for support of such a feature. Nevertheless if the support for
uploading and downloading callbacks will be provided the usage of _ProgressBlob class should be removed.

:param size: Total size of the downloading blob.
:param attempts: Maximum number of download sequential resumes before failing. Defaults to
DEFAULT_RESUME_ATTEMPTS and can be overridden with CP_CLI_RESUMABLE_DOWNLOAD_ATTEMPTS environment variable.
"""
self._size = size
self._progress_callback = progress_callback
self._attempts = int(os.environ.get(CP_CLI_RESUMABLE_DOWNLOAD_ATTEMPTS) or attempts)
self._progress_chunk_size = _ProgressBlob.PROGRESS_CHUNK_SIZE
super(_ProgressBlob, self).__init__(*args, **kwargs)


class GsManager:

def __init__(self, client):
self.client = client

def _progress_blob(self, bucket, blob_name, progress_callback):
def _progress_blob(self, bucket, blob_name, progress_callback, size):
"""
The method is a workaround for the absence of support for the uploading and downloading callbacks
in the official SDK.

See _ProgressBlob documentation for more info.
"""
return _ProgressBlob(
size=size,
progress_callback=progress_callback,
name=blob_name,
bucket=bucket,
progress_callback=progress_callback
bucket=bucket
)


Expand Down Expand Up @@ -294,73 +439,22 @@ def _destination_tags(self, source_wrapper, full_path, raw_tags):
return tags


class ProgressStream:

def __init__(self, stream, progress_callback, progress_chunk_size):
"""
Simple stream wrapper that supports progress callbacks.

Progress callbacks are time-consuming actions. To improve overall performance callbacks have to be called
only a limited number of times. The amount of transferred data to cause a new progress callback is specified
in progress_chunk_size parameter.
class GsDownloadManager(GsManager, AbstractTransferManager):
DEFAULT_BUFFERING_SIZE = 1024 * 1024 # 1MB

:param stream: Wrapping stream.
:param progress_callback: Progress callback.
:param progress_chunk_size: Required amount of transferred data from the previous progress callback
to cause a new callback.
def __init__(self, client, buffering=DEFAULT_BUFFERING_SIZE):
"""
self._stream = stream
self._progress_callback = progress_callback
self._progress_chunk_size = progress_chunk_size
self._bytes_transferred = 0
self._progress_chunk = 0

def write(self, data):
self._stream.write(data)
self._bytes_transferred += len(data)
current_chunk = self._bytes_transferred / self._progress_chunk_size
if current_chunk > self._progress_chunk:
self._progress_chunk = current_chunk
if self._progress_callback:
self._progress_callback(self._bytes_transferred)
Google cloud storage download manager that uses custom buffering size for destination files.

See the corresponding issue for more information on why the buffering size should be altered:
https://github.com/epam/cloud-pipeline/issues/435.

class _ProgressBlob(Blob):
DEFAULT_CHUNK_SIZE = 5 * 1024 * 1024 # 5 MB

def __init__(self, progress_callback=None, *args, **kwargs):
"""
The class is a workaround for the absence of support for the uploading and downloading callbacks
in the official SDK. There is no issues for support of such a feature. Nevertheless if the support for
uploading and downloading callbacks will be provided the usages of _ProgressBlob class should be removed.
:param client: Google cloud storage client.
:param buffering: Buffering size for file system flushing. Defaults to DEFAULT_BUFFERING_SIZE and
can be overridden with CP_CLI_DOWNLOAD_BUFFERING_SIZE environment variable.
"""
super(_ProgressBlob, self).__init__(*args, **kwargs)
self.progress_callback = progress_callback

def _do_download(self, transport, file_obj, download_url, headers, start=None, end=None):
stream = ProgressStream(file_obj, self.progress_callback, _ProgressBlob.DEFAULT_CHUNK_SIZE)
super(_ProgressBlob, self)._do_download(transport, stream, download_url, headers, start, end)

def _do_resumable_upload(self, client, stream, content_type, size, num_retries, predefined_acl):
upload, transport = self._initiate_resumable_upload(
client,
stream,
content_type,
size,
num_retries,
predefined_acl=predefined_acl,
chunk_size=self.chunk_size
)

while not upload.finished:
if self.progress_callback:
self.progress_callback(upload._bytes_uploaded)
response = upload.transmit_next_chunk(transport)

return response


class GsDownloadManager(GsManager, AbstractTransferManager):
GsManager.__init__(self, client)
self._buffering = int(os.environ.get(CP_CLI_DOWNLOAD_BUFFERING_SIZE) or buffering)

def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False,
size=None, tags=(), skip_existing=False):
Expand All @@ -387,13 +481,21 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
if StorageOperations.file_is_empty(size):
blob = bucket.blob(source_key)
else:
blob = self._progress_blob(bucket, source_key, progress_callback)
blob.download_to_filename(destination_key)
blob = self._progress_blob(bucket, source_key, progress_callback, size)
self._download_to_file(blob, destination_key)
if progress_callback is not None:
progress_callback(size)
if clean:
blob.delete()

def _download_to_file(self, blob, destination_key):
try:
with open(destination_key, "wb", buffering=self._buffering) as file_obj:
blob.download_to_file(file_obj)
except DataCorruption:
os.remove(destination_key)
raise


class GsUploadManager(GsManager, AbstractTransferManager):

Expand All @@ -413,7 +515,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
return
progress_callback = GsProgressPercentage.callback(relative_path, size, quiet)
bucket = self.client.bucket(destination_wrapper.bucket.path)
blob = self._progress_blob(bucket, destination_key, progress_callback)
blob = self._progress_blob(bucket, destination_key, progress_callback, size)
blob.metadata = StorageOperations.generate_tags(tags, source_key)
blob.upload_from_filename(source_key)
if progress_callback is not None:
Expand Down Expand Up @@ -464,7 +566,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path
if StorageOperations.file_is_empty(size):
blob = bucket.blob(source_key)
else:
blob = self._progress_blob(bucket, source_key, progress_callback)
blob = self._progress_blob(bucket, source_key, progress_callback, size)
blob.metadata = StorageOperations.generate_tags(tags, source_key)
blob.upload_from_file(_SourceUrlIO(urlopen(source_key)))
if progress_callback is not None:
Expand Down