Skip to content

Commit

Permalink
feat: safely resume interrupted downloads (#294)
Browse files Browse the repository at this point in the history
* fix: add offset of last byte received to retry a streaming download

* add helper method _parse_generation_header

* add some object generation related logic

* revise object genertion helper method

* fix url variable scope

* handle special cases with decompressive transcoding

* fix helper method

* move to _helpers

* add support to safely resume interrupted raw downloads

* add unit tests

* add more tests

* update helper method per comments

* address comments on handling stream seek error

* address comments on moving transcoding feature
  • Loading branch information
cojenco authored Feb 11, 2022
1 parent 8b1b547 commit b363329
Show file tree
Hide file tree
Showing 5 changed files with 479 additions and 24 deletions.
4 changes: 4 additions & 0 deletions google/resumable_media/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ def __init__(
media_url, stream=stream, start=start, end=end, headers=headers
)
self.checksum = checksum
self._bytes_downloaded = 0
self._expected_checksum = None
self._checksum_object = None
self._object_generation = None

def _prepare_request(self):
"""Prepare the contents of an HTTP request.
Expand Down
67 changes: 67 additions & 0 deletions google/resumable_media/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import random
import warnings

from urllib.parse import parse_qs
from urllib.parse import urlencode
from urllib.parse import urlsplit
from urllib.parse import urlunsplit

from google.resumable_media import common


Expand All @@ -33,6 +38,7 @@
"implementation. Python 3 has a faster implementation, `google-crc32c`, "
"which will be used if it is installed."
)
_GENERATION_HEADER = "x-goog-generation"
_HASH_HEADER = "x-goog-hash"
_MISSING_CHECKSUM = """\
No {checksum_type} checksum was returned from the service while downloading {}
Expand Down Expand Up @@ -302,6 +308,67 @@ def _get_checksum_object(checksum_type):
raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``")


def _parse_generation_header(response, get_headers):
"""Parses the generation header from an ``X-Goog-Generation`` value.
Args:
response (~requests.Response): The HTTP response object.
get_headers (callable: response->dict): returns response headers.
Returns:
Optional[long]: The object generation from the response, if it
can be detected from the ``X-Goog-Generation`` header; otherwise, None.
"""
headers = get_headers(response)
object_generation = headers.get(_GENERATION_HEADER, None)

if object_generation is None:
return None
else:
return int(object_generation)


def _get_generation_from_url(media_url):
"""Retrieve the object generation query param specified in the media url.
Args:
media_url (str): The URL containing the media to be downloaded.
Returns:
long: The object generation from the media url if exists; otherwise, None.
"""

_, _, _, query, _ = urlsplit(media_url)
query_params = parse_qs(query)
object_generation = query_params.get("generation", None)

if object_generation is None:
return None
else:
return int(object_generation[0])


def add_query_parameters(media_url, query_params):
"""Add query parameters to a base url.
Args:
media_url (str): The URL containing the media to be downloaded.
query_params (dict): Names and values of the query parameters to add.
Returns:
str: URL with additional query strings appended.
"""

if len(query_params) == 0:
return media_url

scheme, netloc, path, query, frag = urlsplit(media_url)
params = parse_qs(query)
new_params = {**params, **query_params}
query = urlencode(new_params, doseq=True)
return urlunsplit((scheme, netloc, path, query, frag))


class _DoNothingHash(object):
"""Do-nothing hash object.
Expand Down
126 changes: 102 additions & 24 deletions google/resumable_media/requests/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,22 @@ def _write_to_stream(self, response):
checksum doesn't agree with server-computed checksum.
"""

# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
# Retrieve the expected checksum only once for the download request,
# then compute and validate the checksum when the full download completes.
# Retried requests are range requests, and there's no way to detect
# data corruption for that byte range alone.
if self._expected_checksum is None and self._checksum_object is None:
# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
self._expected_checksum = expected_checksum
self._checksum_object = checksum_object
else:
expected_checksum = self._expected_checksum
checksum_object = self._checksum_object

with response:
# NOTE: In order to handle compressed streams gracefully, we try
Expand All @@ -104,6 +114,7 @@ def _write_to_stream(self, response):
)
for chunk in body_iter:
self._stream.write(chunk)
self._bytes_downloaded += len(chunk)
local_checksum_object.update(chunk)

if expected_checksum is not None:
Expand Down Expand Up @@ -150,7 +161,7 @@ def consume(
ValueError: If the current :class:`Download` has already
finished.
"""
method, url, payload, headers = self._prepare_request()
method, _, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
request_kwargs = {
"data": payload,
Expand All @@ -160,10 +171,39 @@ def consume(
if self._stream is not None:
request_kwargs["stream"] = True

# Assign object generation if generation is specified in the media url.
if self._object_generation is None:
self._object_generation = _helpers._get_generation_from_url(self.media_url)

# Wrap the request business logic in a function to be retried.
def retriable_request():
url = self.media_url

# To restart an interrupted download, read from the offset of last byte
# received using a range request, and set object generation query param.
if self._bytes_downloaded > 0:
_download.add_bytes_range(
self._bytes_downloaded, self.end, self._headers
)
request_kwargs["headers"] = self._headers

# Set object generation query param to ensure the same object content is requested.
if (
self._object_generation is not None
and _helpers._get_generation_from_url(self.media_url) is None
):
query_param = {"generation": self._object_generation}
url = _helpers.add_query_parameters(self.media_url, query_param)

result = transport.request(method, url, **request_kwargs)

# If a generation hasn't been specified, and this is the first response we get, let's record the
# generation. In future requests we'll specify the generation query param to avoid data races.
if self._object_generation is None:
self._object_generation = _helpers._parse_generation_header(
result, self._get_headers
)

self._process_response(result)

if self._stream is not None:
Expand Down Expand Up @@ -223,20 +263,30 @@ def _write_to_stream(self, response):
~google.resumable_media.common.DataCorruption: If the download's
checksum doesn't agree with server-computed checksum.
"""

# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
# Retrieve the expected checksum only once for the download request,
# then compute and validate the checksum when the full download completes.
# Retried requests are range requests, and there's no way to detect
# data corruption for that byte range alone.
if self._expected_checksum is None and self._checksum_object is None:
# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
self._expected_checksum = expected_checksum
self._checksum_object = checksum_object
else:
expected_checksum = self._expected_checksum
checksum_object = self._checksum_object

with response:
body_iter = response.raw.stream(
_request_helpers._SINGLE_GET_CHUNK_SIZE, decode_content=False
)
for chunk in body_iter:
self._stream.write(chunk)
self._bytes_downloaded += len(chunk)
checksum_object.update(chunk)
response._content_consumed = True

Expand Down Expand Up @@ -285,19 +335,47 @@ def consume(
ValueError: If the current :class:`Download` has already
finished.
"""
method, url, payload, headers = self._prepare_request()
method, _, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
request_kwargs = {
"data": payload,
"headers": headers,
"timeout": timeout,
"stream": True,
}

# Assign object generation if generation is specified in the media url.
if self._object_generation is None:
self._object_generation = _helpers._get_generation_from_url(self.media_url)

# Wrap the request business logic in a function to be retried.
def retriable_request():
# NOTE: We assume "payload is None" but pass it along anyway.
result = transport.request(
method,
url,
data=payload,
headers=headers,
stream=True,
timeout=timeout,
)
url = self.media_url

# To restart an interrupted download, read from the offset of last byte
# received using a range request, and set object generation query param.
if self._bytes_downloaded > 0:
_download.add_bytes_range(
self._bytes_downloaded, self.end, self._headers
)
request_kwargs["headers"] = self._headers

# Set object generation query param to ensure the same object content is requested.
if (
self._object_generation is not None
and _helpers._get_generation_from_url(self.media_url) is None
):
query_param = {"generation": self._object_generation}
url = _helpers.add_query_parameters(self.media_url, query_param)

result = transport.request(method, url, **request_kwargs)

# If a generation hasn't been specified, and this is the first response we get, let's record the
# generation. In future requests we'll specify the generation query param to avoid data races.
if self._object_generation is None:
self._object_generation = _helpers._parse_generation_header(
result, self._get_headers
)

self._process_response(result)

Expand Down
Loading

0 comments on commit b363329

Please sign in to comment.