Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using futures in batched requests #812

Merged
merged 6 commits into from
May 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions gcloud/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def build_api_url(cls, path, query_params=None,
return url

def _make_request(self, method, url, data=None, content_type=None,
headers=None):
headers=None, target_object=None):
"""A low level method to send a request to the API.

Typically, you shouldn't need to use this method.
Expand All @@ -180,6 +180,12 @@ def _make_request(self, method, url, data=None, content_type=None,
:type headers: dict
:param headers: A dictionary of HTTP headers to send with the request.

:type target_object: object or :class:`NoneType`
:param target_object: Argument to be used by library callers.
This can allow custom behavior, for example, to
defer an HTTP request and complete initialization
of the object at a later time.

:rtype: tuple of ``response`` (a dictionary of sorts)
and ``content`` (a string).
:returns: The HTTP response object and the content of the response,
Expand All @@ -200,9 +206,10 @@ def _make_request(self, method, url, data=None, content_type=None,

headers['User-Agent'] = self.USER_AGENT

return self._do_request(method, url, headers, data)
return self._do_request(method, url, headers, data, target_object)

def _do_request(self, method, url, headers, data):
def _do_request(self, method, url, headers, data,
target_object): # pylint: disable=unused-argument
"""Low-level helper: perform the actual API request over HTTP.

Allows batch context managers to override and defer a request.
Expand All @@ -219,6 +226,10 @@ def _do_request(self, method, url, headers, data):
:type data: string
:param data: The data to send as the body of the request.

:type target_object: object or :class:`NoneType`
:param target_object: Unused ``target_object`` here but may be used
by a superclass.

:rtype: tuple of ``response`` (a dictionary of sorts)
and ``content`` (a string).
:returns: The HTTP response object and the content of the response.
Expand All @@ -229,7 +240,7 @@ def _do_request(self, method, url, headers, data):
def api_request(self, method, path, query_params=None,
data=None, content_type=None,
api_base_url=None, api_version=None,
expect_json=True):
expect_json=True, _target_object=None):
"""Make a request over the HTTP transport to the API.

You shouldn't need to use this method, but if you plan to
Expand Down Expand Up @@ -274,6 +285,12 @@ def api_request(self, method, path, query_params=None,
response as JSON and raise an exception if
that cannot be done. Default is True.

:type _target_object: object or :class:`NoneType`
:param _target_object: Protected argument to be used by library
callers. This can allow custom behavior, for
example, to defer an HTTP request and complete
initialization of the object at a later time.

:raises: Exception if the response code is not 200 OK.
"""
url = self.build_api_url(path=path, query_params=query_params,
Expand All @@ -287,12 +304,14 @@ def api_request(self, method, path, query_params=None,
content_type = 'application/json'

response, content = self._make_request(
method=method, url=url, data=data, content_type=content_type)
method=method, url=url, data=data, content_type=content_type,
target_object=_target_object)

if not 200 <= response.status < 300:
raise make_exception(response, content)

if content and expect_json:
string_or_bytes = (six.binary_type, six.text_type)
if content and expect_json and isinstance(content, string_or_bytes):
content_type = response.get('content-type', '')
if not content_type.startswith('application/json'):
raise TypeError('Expected JSON, got %s' % content_type)
Expand Down
7 changes: 4 additions & 3 deletions gcloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def reload(self, connection=None):
# are handled via custom endpoints.
query_params = {'projection': 'noAcl'}
api_response = connection.api_request(
method='GET', path=self.path, query_params=query_params)
method='GET', path=self.path, query_params=query_params,
_target_object=self)
self._set_properties(api_response)

def _patch_property(self, name, value):
Expand All @@ -84,7 +85,7 @@ def _patch_property(self, name, value):
def _set_properties(self, value):
"""Set the properties for the current object.

:type value: dict
:type value: dict or :class:`gcloud.storage.batch._FutureDict`
:param value: The properties to be set.
"""
self._properties = value
Expand All @@ -108,7 +109,7 @@ def patch(self, connection=None):
for key in self._changes)
api_response = connection.api_request(
method='PATCH', path=self.path, data=update_properties,
query_params={'projection': 'full'})
query_params={'projection': 'full'}, _target_object=self)
self._set_properties(api_response)


Expand Down
153 changes: 128 additions & 25 deletions gcloud/storage/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.parser import Parser
import httplib2
import io
import json

import six

from gcloud._helpers import _LocalStack
from gcloud.exceptions import make_exception
from gcloud.storage import _implicit_environ
from gcloud.storage.connection import Connection

Expand Down Expand Up @@ -71,6 +73,54 @@ class NoContent(object):
status = 204


class _FutureDict(object):
"""Class to hold a future value for a deferred request.

Used by for requests that get sent in a :class:`Batch`.
"""

@staticmethod
def get(key, default=None):
"""Stand-in for dict.get.

:type key: object
:param key: Hashable dictionary key.

:type default: object
:param default: Fallback value to dict.get.

:raises: :class:`KeyError` always since the future is intended to fail
as a dictionary.
"""
raise KeyError('Cannot get(%r, default=%r) on a future' % (
key, default))

def __getitem__(self, key):
"""Stand-in for dict[key].

:type key: object
:param key: Hashable dictionary key.

:raises: :class:`KeyError` always since the future is intended to fail
as a dictionary.
"""
raise KeyError('Cannot get item %r from a future' % (key,))

def __setitem__(self, key, value):
"""Stand-in for dict[key] = value.

:type key: object
:param key: Hashable dictionary key.

:type value: object
:param value: Dictionary value.

:raises: :class:`KeyError` always since the future is intended to fail
as a dictionary.
"""
raise KeyError('Cannot set %r -> %r on a future' % (key, value))


class Batch(Connection):
"""Proxy an underlying connection, batching up change operations.

Expand All @@ -86,9 +136,9 @@ def __init__(self, connection=None):
super(Batch, self).__init__()
self._connection = connection
self._requests = []
self._responses = []
self._target_objects = []

def _do_request(self, method, url, headers, data):
def _do_request(self, method, url, headers, data, target_object):
"""Override Connection: defer actual HTTP request.

Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred.
Expand All @@ -109,22 +159,22 @@ def _do_request(self, method, url, headers, data):
and ``content`` (a string).
:returns: The HTTP response object and the content of the response.
"""
if method == 'GET':
_req = self._connection.http.request
return _req(method=method, uri=url, headers=headers, body=data)

if len(self._requests) >= self._MAX_BATCH_SIZE:
raise ValueError("Too many deferred requests (max %d)" %
self._MAX_BATCH_SIZE)
self._requests.append((method, url, headers, data))
return NoContent(), ''

def finish(self):
"""Submit a single `multipart/mixed` request w/ deferred requests.

:rtype: list of tuples
:returns: one ``(status, reason, payload)`` tuple per deferred request.
:raises: ValueError if no requests have been deferred.
result = _FutureDict()
self._target_objects.append(target_object)
if target_object is not None:
target_object._properties = result

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return NoContent(), result

def _prepare_batch_request(self):
"""Prepares headers and body for a batch request.

:rtype: tuple (dict, string)
:returns: The pair of headers and body of the batch request to be sent.
:raises: :class:`ValueError` if no requests have been deferred.
"""
if len(self._requests) == 0:
raise ValueError("No deferred requests")
Expand All @@ -146,14 +196,51 @@ def finish(self):

# Strip off redundant header text
_, body = payload.split('\n\n', 1)
headers = dict(multi._headers)
return dict(multi._headers), body

def _finish_futures(self, responses):
"""Apply all the batch responses to the futures created.

:type responses: list of (headers, payload) tuples.
:param responses: List of headers and payloads from each response in
the batch.

:raises: :class:`ValueError` if no requests have been deferred.
"""
# If a bad status occurs, we track it, but don't raise an exception
# until all futures have been populated.
exception_args = None

if len(self._target_objects) != len(responses):
raise ValueError('Expected a response for every request.')

for target_object, sub_response in zip(self._target_objects,
responses):
resp_headers, sub_payload = sub_response
if not 200 <= resp_headers.status < 300:
exception_args = exception_args or (resp_headers,
sub_payload)
elif target_object is not None:
target_object._properties = sub_payload

if exception_args is not None:
raise make_exception(*exception_args)

def finish(self):
"""Submit a single `multipart/mixed` request w/ deferred requests.

:rtype: list of tuples
:returns: one ``(headers, payload)`` tuple per deferred request.
"""
headers, body = self._prepare_batch_request()

url = '%s/batch' % self.API_BASE_URL

_req = self._connection._make_request
response, content = _req('POST', url, data=body, headers=headers)
self._responses = list(_unpack_batch_response(response, content))
return self._responses
response, content = self._connection._make_request(
'POST', url, data=body, headers=headers)
responses = list(_unpack_batch_response(response, content))
self._finish_futures(responses)
return responses

@staticmethod
def current():
Expand Down Expand Up @@ -199,7 +286,20 @@ def _generate_faux_mime_message(parser, response, content):


def _unpack_batch_response(response, content):
"""Convert response, content -> [(status, reason, payload)]."""
"""Convert response, content -> [(headers, payload)].

Creates a generator of tuples of emulating the responses to
:meth:`httplib2.Http.request` (a pair of headers and payload).

:type response: :class:`httplib2.Response`
:param response: HTTP response / headers from a request.

:type content: string
:param content: Response payload with a batch response.

:rtype: generator
:returns: A generator of header, payload pairs.
"""
parser = Parser()
message = _generate_faux_mime_message(parser, response, content)

Expand All @@ -208,10 +308,13 @@ def _unpack_batch_response(response, content):

for subrequest in message._payload:
status_line, rest = subrequest._payload.split('\n', 1)
_, status, reason = status_line.split(' ', 2)
message = parser.parsestr(rest)
payload = message._payload
ctype = message['Content-Type']
_, status, _ = status_line.split(' ', 2)
sub_message = parser.parsestr(rest)
payload = sub_message._payload
ctype = sub_message['Content-Type']
msg_headers = dict(sub_message._headers)
msg_headers['status'] = status
headers = httplib2.Response(msg_headers)
if ctype and ctype.startswith('application/json'):
payload = json.loads(payload)
yield status, reason, payload
yield headers, payload
3 changes: 2 additions & 1 deletion gcloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def exists(self, connection=None):
# minimize the returned payload.
query_params = {'fields': 'name'}
connection.api_request(method='GET', path=self.path,
query_params=query_params)
query_params=query_params,
_target_object=self)
return True
except NotFound:
return False
Expand Down
Loading