Skip to content

Commit

Permalink
Merge pull request #3138 from dhermes/ds-move-lookup-to-GAPIC
Browse files Browse the repository at this point in the history
Using GAPIC datastore object (and an HTTP equivalent) for lookup.
  • Loading branch information
dhermes authored Mar 13, 2017
2 parents d9f2dda + 39bbf25 commit 0a6dc31
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 563 deletions.
47 changes: 0 additions & 47 deletions datastore/google/cloud/datastore/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@
import sys

from google.cloud.gapic.datastore.v1 import datastore_client
from google.cloud.proto.datastore.v1 import datastore_pb2_grpc
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
from google.gax.utils import metrics
from grpc import StatusCode
import six

from google.cloud._helpers import make_insecure_stub
from google.cloud._helpers import make_secure_channel
from google.cloud._helpers import make_secure_stub
from google.cloud._http import DEFAULT_USER_AGENT
from google.cloud import exceptions

Expand Down Expand Up @@ -92,50 +89,6 @@ def _grpc_catch_rendezvous():
six.reraise(error_class, new_exc, sys.exc_info()[2])


class _DatastoreAPIOverGRPC(object):
"""Helper mapping datastore API methods.
Makes requests to send / receive protobuf content over gRPC.
Methods make bare API requests without any helpers for constructing
the requests or parsing the responses.
:type connection: :class:`Connection`
:param connection: A connection object that contains helpful
information for making requests.
"""

def __init__(self, connection):
parse_result = six.moves.urllib_parse.urlparse(
connection.api_base_url)
host = parse_result.hostname
if parse_result.scheme == 'https':
self._stub = make_secure_stub(
connection.credentials, DEFAULT_USER_AGENT,
datastore_pb2_grpc.DatastoreStub, host,
extra_options=_GRPC_EXTRA_OPTIONS)
else:
self._stub = make_insecure_stub(
datastore_pb2_grpc.DatastoreStub, host)

def lookup(self, project, request_pb):
"""Perform a ``lookup`` request.
:type project: str
:param project: The project to connect to. This is
usually your project name in the cloud console.
:type request_pb: :class:`.datastore_pb2.LookupRequest`
:param request_pb: The request protobuf object.
:rtype: :class:`.datastore_pb2.LookupResponse`
:returns: The returned protobuf response object.
"""
request_pb.project_id = project
with _grpc_catch_rendezvous():
return self._stub.Lookup(request_pb)


class GAPICDatastoreAPI(datastore_client.DatastoreClient):
"""An API object that sends proto-over-gRPC requests.
Expand Down
147 changes: 21 additions & 126 deletions datastore/google/cloud/datastore/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,13 @@

"""Connections to Google Cloud Datastore API servers."""

import os

from google.rpc import status_pb2

from google.cloud import _http as connection_module
from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud import exceptions
from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2

from google.cloud.datastore import __version__
try:
from google.cloud.datastore._gax import _DatastoreAPIOverGRPC
_HAVE_GRPC = True
except ImportError: # pragma: NO COVER
_DatastoreAPIOverGRPC = None
_HAVE_GRPC = False


DATASTORE_API_HOST = 'datastore.googleapis.com'
Expand All @@ -42,8 +33,6 @@
'/{project}:{method}')
"""A template for the URL of a particular API call."""

_DISABLE_GRPC = os.getenv(DISABLE_GRPC, False)
_USE_GRPC = _HAVE_GRPC and not _DISABLE_GRPC
_CLIENT_INFO = connection_module.CLIENT_INFO_TEMPLATE.format(__version__)


Expand Down Expand Up @@ -148,121 +137,45 @@ def build_api_url(project, method, base_url):
project=project, method=method)


class _DatastoreAPIOverHttp(object):
"""Helper mapping datastore API methods.
Makes requests to send / receive protobuf content over HTTP/1.1.
class HTTPDatastoreAPI(object):
"""An API object that sends proto-over-HTTP requests.
Methods make bare API requests without any helpers for constructing
the requests or parsing the responses.
Intended to provide the same methods as the GAPIC ``DatastoreClient``.
:type connection: :class:`Connection`
:param connection: A connection object that contains helpful
information for making requests.
:type client: :class:`~google.cloud.datastore.client.Client`
:param client: The client that provides configuration.
"""

def __init__(self, connection):
self.connection = connection
def __init__(self, client):
self.client = client

def lookup(self, project, request_pb):
def lookup(self, project, read_options, key_pbs):
"""Perform a ``lookup`` request.
:type project: str
:param project: The project to connect to. This is
usually your project name in the cloud console.
:type request_pb: :class:`.datastore_pb2.LookupRequest`
:param request_pb: The request protobuf object.
:rtype: :class:`.datastore_pb2.LookupResponse`
:returns: The returned protobuf response object.
"""
return _rpc(self.connection.http, project, 'lookup',
self.connection.api_base_url,
request_pb, _datastore_pb2.LookupResponse)


class Connection(connection_module.Connection):
"""A connection to the Google Cloud Datastore via the Protobuf API.
This class should understand only the basic types (and protobufs)
in method arguments, however it should be capable of returning advanced
types.
:type client: :class:`~google.cloud.datastore.client.Client`
:param client: The client that owns the current connection.
"""

def __init__(self, client):
super(Connection, self).__init__(client)
self.api_base_url = client._base_url
if _USE_GRPC:
self._datastore_api = _DatastoreAPIOverGRPC(self)
else:
self._datastore_api = _DatastoreAPIOverHttp(self)

def lookup(self, project, key_pbs,
eventual=False, transaction_id=None):
"""Lookup keys from a project in the Cloud Datastore.
Maps the ``DatastoreService.Lookup`` protobuf RPC.
This uses mostly protobufs
(:class:`.entity_pb2.Key` as input and :class:`.entity_pb2.Entity`
as output). It is used under the hood in
:meth:`Client.get() <.datastore.client.Client.get>`:
.. code-block:: python
>>> from google.cloud import datastore
>>> client = datastore.Client(project='project')
>>> key = client.key('MyKind', 1234)
>>> client.get(key)
[<Entity object>]
Using a :class:`Connection` directly:
.. code-block:: python
>>> connection.lookup('project', [key.to_protobuf()])
[<Entity protobuf>]
:type project: str
:param project: The project to look up the keys in.
:type read_options: :class:`.datastore_pb2.ReadOptions`
:param read_options: The options for this lookup. Contains a
either the transaction for the read or
``STRONG`` or ``EVENTUAL`` read consistency.
:type key_pbs: list of
:class:`.entity_pb2.Key`
:param key_pbs: The keys to retrieve from the datastore.
:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency.
:type transaction_id: str
:param transaction_id: If passed, make the request in the scope of
the given transaction. Incompatible with
``eventual==True``.
:rtype: :class:`.datastore_pb2.LookupResponse`
:returns: The returned protobuf for the lookup request.
:returns: The returned protobuf response object.
"""
lookup_request = _datastore_pb2.LookupRequest(keys=key_pbs)
_set_read_options(lookup_request, eventual, transaction_id)
return self._datastore_api.lookup(project, lookup_request)


class HTTPDatastoreAPI(object):
"""An API object that sends proto-over-HTTP requests.
Intended to provide the same methods as the GAPIC ``DatastoreClient``.
:type client: :class:`~google.cloud.datastore.client.Client`
:param client: The client that provides configuration.
"""

def __init__(self, client):
self.client = client
request_pb = _datastore_pb2.LookupRequest(
project_id=project,
read_options=read_options,
keys=key_pbs,
)
return _rpc(self.client._http, project, 'lookup',
self.client._base_url,
request_pb, _datastore_pb2.LookupResponse)

def run_query(self, project, partition_id, read_options,
query=None, gql_query=None):
Expand Down Expand Up @@ -390,21 +303,3 @@ def allocate_ids(self, project, key_pbs):
return _rpc(self.client._http, project, 'allocateIds',
self.client._base_url,
request_pb, _datastore_pb2.AllocateIdsResponse)


def _set_read_options(request, eventual, transaction_id):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if eventual and (transaction_id is not None):
raise ValueError('eventual must be False when in a transaction')

opts = request.read_options
if eventual:
opts.read_consistency = _datastore_pb2.ReadOptions.EVENTUAL
elif transaction_id:
opts.transaction = transaction_id
2 changes: 1 addition & 1 deletion datastore/google/cloud/datastore/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def _commit(self):
self.project, mode, self._mutations, transaction=self._id)
_, updated_keys = _parse_commit_response(commit_response_pb)
# If the back-end returns without error, we are guaranteed that
# :meth:`Connection.commit` will return keys that match (length and
# ``commit`` will return keys that match (length and
# order) directly ``_partial_key_entities``.
for new_key_pb, entity in zip(updated_keys,
self._partial_key_entities):
Expand Down
58 changes: 43 additions & 15 deletions datastore/google/cloud/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import os

from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2

from google.cloud._helpers import _LocalStack
from google.cloud._helpers import (
_determine_default_project as _base_default_project)
Expand All @@ -23,7 +25,6 @@
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.environment_vars import GCD_HOST

from google.cloud.datastore._http import Connection
from google.cloud.datastore._http import HTTPDatastoreAPI
from google.cloud.datastore import helpers
from google.cloud.datastore.batch import Batch
Expand Down Expand Up @@ -78,15 +79,18 @@ def _determine_default_project(project=None):
return project


def _extended_lookup(connection, project, key_pbs,
def _extended_lookup(datastore_api, project, key_pbs,
missing=None, deferred=None,
eventual=False, transaction_id=None):
"""Repeat lookup until all keys found (unless stop requested).
Helper function for :meth:`Client.get_multi`.
:type connection: :class:`google.cloud.datastore._http.Connection`
:param connection: The connection used to connect to datastore.
:type datastore_api:
:class:`google.cloud.datastore._http.HTTPDatastoreAPI`
or :class:`google.cloud.datastore._gax.GAPICDatastoreAPI`
:param datastore_api: The datastore API object used to connect
to datastore.
:type project: str
:param project: The project to make the request for.
Expand Down Expand Up @@ -127,15 +131,11 @@ def _extended_lookup(connection, project, key_pbs,
results = []

loop_num = 0
read_options = _get_read_options(eventual, transaction_id)
while loop_num < _MAX_LOOPS: # loop against possible deferred.
loop_num += 1

lookup_response = connection.lookup(
project=project,
key_pbs=key_pbs,
eventual=eventual,
transaction_id=transaction_id,
)
lookup_response = datastore_api.lookup(
project, read_options, key_pbs)

# Accumulate the new results.
results.extend(result.entity for result in lookup_response.found)
Expand Down Expand Up @@ -210,9 +210,6 @@ def __init__(self, project=None, namespace=None,
self._base_url = 'http://' + host
except KeyError:
self._base_url = _DATASTORE_BASE_URL
# NOTE: Make sure all properties are set before passing to
# ``Connection`` (e.g. ``_base_url``).
self._connection = Connection(self)

@staticmethod
def _determine_default(project):
Expand Down Expand Up @@ -347,7 +344,7 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
transaction = self.current_transaction

entity_pbs = _extended_lookup(
connection=self._connection,
datastore_api=self._datastore_api,
project=self.project,
key_pbs=[k.to_protobuf() for k in keys],
missing=missing,
Expand Down Expand Up @@ -569,3 +566,34 @@ def do_something(entity):
if 'namespace' not in kwargs:
kwargs['namespace'] = self.namespace
return Query(self, **kwargs)


def _get_read_options(eventual, transaction_id):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
:type eventual: bool
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
consistency should be used.
:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).
:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if transaction_id is None:
if eventual:
return _datastore_pb2.ReadOptions(
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
else:
return _datastore_pb2.ReadOptions()
else:
if eventual:
raise ValueError('eventual must be False when in a transaction')
else:
return _datastore_pb2.ReadOptions(
transaction=transaction_id)
2 changes: 0 additions & 2 deletions datastore/google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,6 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
:rtype: :class:`Iterator`
:returns: The iterator for the query.
:raises: ValueError if ``connection`` is not passed and no implicit
default has been set.
"""
if client is None:
client = self._client
Expand Down
Loading

0 comments on commit 0a6dc31

Please sign in to comment.