Skip to content

Commit

Permalink
Merge pull request #3137 from dhermes/ds-move-run_query-to-GAPIC
Browse files Browse the repository at this point in the history
Using GAPIC datastore object (and an HTTP equivalent) for run_query.
  • Loading branch information
dhermes authored Mar 13, 2017
2 parents fee3f2f + 4e509db commit d9f2dda
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 301 deletions.
17 changes: 0 additions & 17 deletions datastore/google/cloud/datastore/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,6 @@ def lookup(self, project, request_pb):
with _grpc_catch_rendezvous():
return self._stub.Lookup(request_pb)

def run_query(self, project, request_pb):
"""Perform a ``runQuery`` request.
:type project: str
:param project: The project to connect to. This is
usually your project name in the cloud console.
:type request_pb: :class:`.datastore_pb2.RunQueryRequest`
:param request_pb: The request protobuf object.
:rtype: :class:`.datastore_pb2.RunQueryResponse`
:returns: The returned protobuf response object.
"""
request_pb.project_id = project
with _grpc_catch_rendezvous():
return self._stub.RunQuery(request_pb)


class GAPICDatastoreAPI(datastore_client.DatastoreClient):
"""An API object that sends proto-over-gRPC requests.
Expand Down
127 changes: 44 additions & 83 deletions datastore/google/cloud/datastore/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,23 +181,6 @@ def lookup(self, project, request_pb):
self.connection.api_base_url,
request_pb, _datastore_pb2.LookupResponse)

def run_query(self, project, request_pb):
"""Perform a ``runQuery`` request.
:type project: str
:param project: The project to connect to. This is
usually your project name in the cloud console.
:type request_pb: :class:`.datastore_pb2.RunQueryRequest`
:param request_pb: The request protobuf object.
:rtype: :class:`.datastore_pb2.RunQueryResponse`
:returns: The returned protobuf response object.
"""
return _rpc(self.connection.http, project, 'runQuery',
self.connection.api_base_url,
request_pb, _datastore_pb2.RunQueryResponse)


class Connection(connection_module.Connection):
"""A connection to the Google Cloud Datastore via the Protobuf API.
Expand Down Expand Up @@ -264,71 +247,61 @@ def lookup(self, project, key_pbs,
:rtype: :class:`.datastore_pb2.LookupResponse`
:returns: The returned protobuf for the lookup request.
"""
lookup_request = _datastore_pb2.LookupRequest()
lookup_request = _datastore_pb2.LookupRequest(keys=key_pbs)
_set_read_options(lookup_request, eventual, transaction_id)
_add_keys_to_request(lookup_request.keys, key_pbs)

return self._datastore_api.lookup(project, lookup_request)

def run_query(self, project, query_pb, namespace=None,
eventual=False, transaction_id=None):
"""Run a query on the Cloud Datastore.

Maps the ``DatastoreService.RunQuery`` protobuf RPC.
class HTTPDatastoreAPI(object):
"""An API object that sends proto-over-HTTP requests.
Given a Query protobuf, sends a ``runQuery`` request to the
Cloud Datastore API and returns a list of entity protobufs
matching the query.
Intended to provide the same methods as the GAPIC ``DatastoreClient``.
You typically wouldn't use this method directly, in favor of the
:meth:`google.cloud.datastore.query.Query.fetch` method.
:type client: :class:`~google.cloud.datastore.client.Client`
:param client: The client that provides configuration.
"""

Under the hood, the :class:`google.cloud.datastore.query.Query` class
uses this method to fetch data.
def __init__(self, client):
self.client = client

def run_query(self, project, partition_id, read_options,
query=None, gql_query=None):
"""Perform a ``runQuery`` request.
:type project: str
:param project: The project over which to run the query.
:param project: The project to connect to. This is
usually your project name in the cloud console.
:type query_pb: :class:`.query_pb2.Query`
:param query_pb: The Protobuf representing the query to run.
:type partition_id: :class:`.entity_pb2.PartitionId`
:param partition_id: Partition ID corresponding to an optional
namespace and project ID.
:type namespace: str
:param namespace: The namespace over which to run the query.
:type read_options: :class:`.datastore_pb2.ReadOptions`
:param read_options: The options for this query. Contains a
either the transaction for the read or
``STRONG`` or ``EVENTUAL`` read consistency.
:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency.
:type query: :class:`.query_pb2.Query`
:param query: (Optional) The query protobuf to run. At most one of
``query`` and ``gql_query`` can be specified.
:type transaction_id: str
:param transaction_id: If passed, make the request in the scope of
the given transaction. Incompatible with
``eventual==True``.
:type gql_query: :class:`.query_pb2.GqlQuery`
:param gql_query: (Optional) The GQL query to run. At most one of
``query`` and ``gql_query`` can be specified.
:rtype: :class:`.datastore_pb2.RunQueryResponse`
:returns: The protobuf response from a ``runQuery`` request.
:returns: The returned protobuf response object.
"""
request = _datastore_pb2.RunQueryRequest()
_set_read_options(request, eventual, transaction_id)

if namespace:
request.partition_id.namespace_id = namespace

request.query.CopyFrom(query_pb)
return self._datastore_api.run_query(project, request)


class HTTPDatastoreAPI(object):
"""An API object that sends proto-over-HTTP requests.
Intended to provide the same methods as the GAPIC ``DatastoreClient``.
:type client: :class:`~google.cloud.datastore.client.Client`
:param client: The client that provides configuration.
"""

def __init__(self, client):
self.client = client
request_pb = _datastore_pb2.RunQueryRequest(
project_id=project,
partition_id=partition_id,
read_options=read_options,
query=query,
gql_query=gql_query,
)
return _rpc(self.client._http, project, 'runQuery',
self.client._base_url,
request_pb, _datastore_pb2.RunQueryResponse)

def begin_transaction(self, project):
"""Perform a ``beginTransaction`` request.
Expand Down Expand Up @@ -391,8 +364,10 @@ def rollback(self, project, transaction_id):
:rtype: :class:`.datastore_pb2.RollbackResponse`
:returns: The returned protobuf response object.
"""
request_pb = _datastore_pb2.RollbackRequest()
request_pb.transaction = transaction_id
request_pb = _datastore_pb2.RollbackRequest(
project_id=project,
transaction=transaction_id,
)
# Response is empty (i.e. no fields) but we return it anyway.
return _rpc(self.client._http, project, 'rollback',
self.client._base_url,
Expand All @@ -411,8 +386,7 @@ def allocate_ids(self, project, key_pbs):
:rtype: :class:`.datastore_pb2.AllocateIdsResponse`
:returns: The returned protobuf response object.
"""
request_pb = _datastore_pb2.AllocateIdsRequest()
_add_keys_to_request(request_pb.keys, key_pbs)
request_pb = _datastore_pb2.AllocateIdsRequest(keys=key_pbs)
return _rpc(self.client._http, project, 'allocateIds',
self.client._base_url,
request_pb, _datastore_pb2.AllocateIdsResponse)
Expand All @@ -434,16 +408,3 @@ def _set_read_options(request, eventual, transaction_id):
opts.read_consistency = _datastore_pb2.ReadOptions.EVENTUAL
elif transaction_id:
opts.transaction = transaction_id


def _add_keys_to_request(request_field_pb, key_pbs):
"""Add protobuf keys to a request object.
:type request_field_pb: `RepeatedCompositeFieldContainer`
:param request_field_pb: A repeated proto field that contains keys.
:type key_pbs: list of :class:`.entity_pb2.Key`
:param key_pbs: The keys to add to a request.
"""
for key_pb in key_pbs:
request_field_pb.add().CopyFrom(key_pb)
24 changes: 17 additions & 7 deletions datastore/google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from google.cloud.iterator import Iterator as BaseIterator
from google.cloud.iterator import Page

from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2
from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2 as _query_pb2
from google.cloud.datastore import helpers
from google.cloud.datastore.key import Key
Expand Down Expand Up @@ -479,14 +481,22 @@ def _next_page(self):
if not self._more_results:
return None

pb = self._build_protobuf()
query_pb = self._build_protobuf()
transaction = self.client.current_transaction

response_pb = self.client._connection.run_query(
query_pb=pb,
project=self._query.project,
namespace=self._query.namespace,
transaction_id=transaction and transaction.id,
if transaction is None:
read_options = _datastore_pb2.ReadOptions()
else:
read_options = _datastore_pb2.ReadOptions(
transaction=transaction.id)

partition_id = _entity_pb2.PartitionId(
project_id=self._query.project,
namespace_id=self._query.namespace)
response_pb = self.client._datastore_api.run_query(
self._query.project,
partition_id,
read_options,
query=query_pb,
)
entity_pbs = self._process_query_results(response_pb)
return Page(self, entity_pbs, self._item_to_value)
Expand Down
51 changes: 2 additions & 49 deletions datastore/unit_tests/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,46 +200,6 @@ def test_lookup(self):
self.assertEqual(stub.method_calls,
[(request_pb, 'Lookup')])

def test_run_query(self):
return_val = object()
stub = _GRPCStub(return_val)
datastore_api, _ = self._make_one(stub=stub)

request_pb = mock.Mock(project_id=None, spec=['project_id'])
project = 'PROJECT'
result = datastore_api.run_query(project, request_pb)
self.assertIs(result, return_val)
self.assertEqual(request_pb.project_id, project)
self.assertEqual(stub.method_calls,
[(request_pb, 'RunQuery')])

def _run_query_failure_helper(self, exc, err_class):
stub = _GRPCStub(side_effect=exc)
datastore_api, _ = self._make_one(stub=stub)

request_pb = mock.Mock(project_id=None, spec=['project_id'])
project = 'PROJECT'
with self.assertRaises(err_class):
datastore_api.run_query(project, request_pb)

self.assertEqual(request_pb.project_id, project)
self.assertEqual(stub.method_calls,
[(request_pb, 'RunQuery')])

@unittest.skipUnless(_HAVE_GRPC, 'No gRPC')
def test_run_query_invalid_argument(self):
from grpc import StatusCode
from grpc._channel import _RPCState
from google.cloud.exceptions import BadRequest
from google.cloud.exceptions import GrpcRendezvous

details = ('Cannot have inequality filters on multiple '
'properties: [created, priority]')
exc_state = _RPCState((), None, None,
StatusCode.INVALID_ARGUMENT, details)
exc = GrpcRendezvous(exc_state, None, None, None)
self._run_query_failure_helper(exc, BadRequest)


@unittest.skipUnless(_HAVE_GRPC, 'No gRPC')
class TestGAPICDatastoreAPI(unittest.TestCase):
Expand Down Expand Up @@ -307,20 +267,13 @@ def test_it(self, make_chan, mock_klass):

class _GRPCStub(object):

def __init__(self, return_val=None, side_effect=Exception):
def __init__(self, return_val=None):
self.return_val = return_val
self.side_effect = side_effect
self.method_calls = []

def _method(self, request_pb, name):
self.method_calls.append((request_pb, name))
if self.side_effect is Exception:
return self.return_val
else:
raise self.side_effect
return self.return_val

def Lookup(self, request_pb):
return self._method(request_pb, 'Lookup')

def RunQuery(self, request_pb):
return self._method(request_pb, 'RunQuery')
Loading

0 comments on commit d9f2dda

Please sign in to comment.