From dd856df267d20464899531de8a291586c4203641 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Sun, 4 Sep 2016 10:19:55 -0700 Subject: [PATCH] Updating Pub / Sub GAX calls to work with emulator. In the process, removing the api_base_url from the Pub / Sub Connection class. --- google/cloud/pubsub/_gax.py | 44 +++++++++++ google/cloud/pubsub/client.py | 16 ++-- google/cloud/pubsub/connection.py | 28 ++++--- unit_tests/pubsub/test__gax.py | 113 +++++++++++++++++++++++++++ unit_tests/pubsub/test_client.py | 8 +- unit_tests/pubsub/test_connection.py | 28 +------ 6 files changed, 188 insertions(+), 49 deletions(-) diff --git a/google/cloud/pubsub/_gax.py b/google/cloud/pubsub/_gax.py index 0d6c6acb7109..0ef09a3d2a6b 100644 --- a/google/cloud/pubsub/_gax.py +++ b/google/cloud/pubsub/_gax.py @@ -14,11 +14,14 @@ """GAX wrapper for Pubsub API requests.""" +from google.cloud.pubsub.v1.publisher_api import PublisherApi +from google.cloud.pubsub.v1.subscriber_api import SubscriberApi from google.gax import CallOptions from google.gax import INITIAL_PAGE from google.gax.errors import GaxError from google.pubsub.v1.pubsub_pb2 import PubsubMessage from google.pubsub.v1.pubsub_pb2 import PushConfig +from grpc.beta.implementations import insecure_channel from grpc import StatusCode # pylint: disable=ungrouped-imports @@ -498,3 +501,44 @@ def _received_message_pb_to_mapping(received_message_pb): 'message': _message_pb_to_mapping( received_message_pb.message), } + + +def make_gax_publisher_api(connection): + """Create an instance of the GAX Publisher API. + + If the ``connection`` is intended for a local emulator, then + an insecure ``channel`` is created pointing at the local + Pub / Sub server. + + :type connection: :class:`~google.cloud.pubsub.connection.Connection` + :param connection: The connection that holds configuration details. + + :rtype: :class:`~google.cloud.pubsub.v1.publisher_api.PublisherApi` + :returns: A publisher API instance with the proper connection + configuration. + :rtype: :class:`~google.cloud.pubsub.v1.subscriber_api.SubscriberApi` + """ + channel = None + if connection.in_emulator: + channel = insecure_channel(connection.host, None) + return PublisherApi(channel=channel) + + +def make_gax_subscriber_api(connection): + """Create an instance of the GAX Subscriber API. + + If the ``connection`` is intended for a local emulator, then + an insecure ``channel`` is created pointing at the local + Pub / Sub server. + + :type connection: :class:`~google.cloud.pubsub.connection.Connection` + :param connection: The connection that holds configuration details. + + :rtype: :class:`~google.cloud.pubsub.v1.subscriber_api.SubscriberApi` + :returns: A subscriber API instance with the proper connection + configuration. + """ + channel = None + if connection.in_emulator: + channel = insecure_channel(connection.host, None) + return SubscriberApi(channel=channel) diff --git a/google/cloud/pubsub/client.py b/google/cloud/pubsub/client.py index aed596e3d002..ae77eb14b389 100644 --- a/google/cloud/pubsub/client.py +++ b/google/cloud/pubsub/client.py @@ -27,16 +27,16 @@ # pylint: disable=ungrouped-imports try: - from google.cloud.pubsub.v1.publisher_api import ( - PublisherApi as GeneratedPublisherAPI) - from google.cloud.pubsub.v1.subscriber_api import ( - SubscriberApi as GeneratedSubscriberAPI) from google.cloud.pubsub._gax import _PublisherAPI as GAXPublisherAPI from google.cloud.pubsub._gax import _SubscriberAPI as GAXSubscriberAPI + from google.cloud.pubsub._gax import make_gax_publisher_api + from google.cloud.pubsub._gax import make_gax_subscriber_api except ImportError: # pragma: NO COVER _HAVE_GAX = False - GeneratedPublisherAPI = GAXPublisherAPI = None - GeneratedSubscriberAPI = GAXSubscriberAPI = None + GAXPublisherAPI = None + GAXSubscriberAPI = None + make_gax_publisher_api = None + make_gax_subscriber_api = None else: _HAVE_GAX = True # pylint: enable=ungrouped-imports @@ -75,7 +75,7 @@ def publisher_api(self): """Helper for publisher-related API calls.""" if self._publisher_api is None: if _USE_GAX: - generated = GeneratedPublisherAPI() + generated = make_gax_publisher_api(self.connection) self._publisher_api = GAXPublisherAPI(generated) else: self._publisher_api = JSONPublisherAPI(self.connection) @@ -86,7 +86,7 @@ def subscriber_api(self): """Helper for subscriber-related API calls.""" if self._subscriber_api is None: if _USE_GAX: - generated = GeneratedSubscriberAPI() + generated = make_gax_subscriber_api(self.connection) self._subscriber_api = GAXSubscriberAPI(generated) else: self._subscriber_api = JSONSubscriberAPI(self.connection) diff --git a/google/cloud/pubsub/connection.py b/google/cloud/pubsub/connection.py index df00ea8031bd..41db9ba7f80f 100644 --- a/google/cloud/pubsub/connection.py +++ b/google/cloud/pubsub/connection.py @@ -20,6 +20,10 @@ from google.cloud.environment_vars import PUBSUB_EMULATOR +PUBSUB_API_HOST = 'pubsub.googleapis.com' +"""Pub / Sub API request host.""" + + class Connection(base_connection.JSONConnection): """A connection to Google Cloud Pub/Sub via the JSON REST API. @@ -29,13 +33,9 @@ class Connection(base_connection.JSONConnection): :type http: :class:`httplib2.Http` or class that defines ``request()``. :param http: (Optional) HTTP object to make requests. - - :type api_base_url: string - :param api_base_url: The base of the API call URL. Defaults to the value - :attr:`Connection.API_BASE_URL`. """ - API_BASE_URL = 'https://pubsub.googleapis.com' + API_BASE_URL = 'https://' + PUBSUB_API_HOST """The base of the API call URL.""" API_VERSION = 'v1' @@ -48,15 +48,17 @@ class Connection(base_connection.JSONConnection): 'https://www.googleapis.com/auth/cloud-platform') """The scopes required for authenticating as a Cloud Pub/Sub consumer.""" - def __init__(self, credentials=None, http=None, api_base_url=None): + def __init__(self, credentials=None, http=None): super(Connection, self).__init__(credentials=credentials, http=http) - if api_base_url is None: - emulator_host = os.getenv(PUBSUB_EMULATOR) - if emulator_host is None: - api_base_url = self.__class__.API_BASE_URL - else: - api_base_url = 'http://' + emulator_host - self.api_base_url = api_base_url + emulator_host = os.getenv(PUBSUB_EMULATOR) + if emulator_host is None: + self.host = self.__class__.API_BASE_URL + self.api_base_url = self.__class__.API_BASE_URL + self.in_emulator = False + else: + self.host = emulator_host + self.api_base_url = 'http://' + emulator_host + self.in_emulator = True def build_api_url(self, path, query_params=None, api_base_url=None, api_version=None): diff --git a/unit_tests/pubsub/test__gax.py b/unit_tests/pubsub/test__gax.py index f830cf0d29e4..a49af1b6f46a 100644 --- a/unit_tests/pubsub/test__gax.py +++ b/unit_tests/pubsub/test__gax.py @@ -749,6 +749,112 @@ def test_subscription_modify_ack_deadline_error(self): self.assertEqual(options, None) +@unittest.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_make_gax_publisher_api(_Base, unittest.TestCase): + + def _callFUT(self, connection): + from google.cloud.pubsub._gax import make_gax_publisher_api + return make_gax_publisher_api(connection) + + def test_live_api(self): + from unit_tests._testing import _Monkey + from google.cloud.pubsub import _gax as MUT + + channels = [] + mock_result = object() + + def mock_publisher_api(channel): + channels.append(channel) + return mock_result + + connection = _Connection(in_emulator=False) + with _Monkey(MUT, PublisherApi=mock_publisher_api): + result = self._callFUT(connection) + + self.assertIs(result, mock_result) + self.assertEqual(channels, [None]) + + def test_emulator(self): + from unit_tests._testing import _Monkey + from google.cloud.pubsub import _gax as MUT + + channels = [] + mock_result = object() + insecure_args = [] + mock_channel = object() + + def mock_publisher_api(channel): + channels.append(channel) + return mock_result + + def mock_insecure_channel(host, port): + insecure_args.append((host, port)) + return mock_channel + + host = 'CURR_HOST:1234' + connection = _Connection(in_emulator=True, host=host) + with _Monkey(MUT, PublisherApi=mock_publisher_api, + insecure_channel=mock_insecure_channel): + result = self._callFUT(connection) + + self.assertIs(result, mock_result) + self.assertEqual(channels, [mock_channel]) + self.assertEqual(insecure_args, [(host, None)]) + + +@unittest.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_make_gax_subscriber_api(_Base, unittest.TestCase): + + def _callFUT(self, connection): + from google.cloud.pubsub._gax import make_gax_subscriber_api + return make_gax_subscriber_api(connection) + + def test_live_api(self): + from unit_tests._testing import _Monkey + from google.cloud.pubsub import _gax as MUT + + channels = [] + mock_result = object() + + def mock_subscriber_api(channel): + channels.append(channel) + return mock_result + + connection = _Connection(in_emulator=False) + with _Monkey(MUT, SubscriberApi=mock_subscriber_api): + result = self._callFUT(connection) + + self.assertIs(result, mock_result) + self.assertEqual(channels, [None]) + + def test_emulator(self): + from unit_tests._testing import _Monkey + from google.cloud.pubsub import _gax as MUT + + channels = [] + mock_result = object() + insecure_args = [] + mock_channel = object() + + def mock_subscriber_api(channel): + channels.append(channel) + return mock_result + + def mock_insecure_channel(host, port): + insecure_args.append((host, port)) + return mock_channel + + host = 'CURR_HOST:1234' + connection = _Connection(in_emulator=True, host=host) + with _Monkey(MUT, SubscriberApi=mock_subscriber_api, + insecure_channel=mock_insecure_channel): + result = self._callFUT(connection) + + self.assertIs(result, mock_result) + self.assertEqual(channels, [mock_channel]) + self.assertEqual(insecure_args, [(host, None)]) + + class _GAXPublisherAPI(_GAXBaseAPI): _create_topic_conflict = False @@ -930,3 +1036,10 @@ def __init__(self, name, topic, push_endpoint, ack_deadline_seconds): self.topic = topic self.push_config = _PushConfigPB(push_endpoint) self.ack_deadline_seconds = ack_deadline_seconds + + +class _Connection(object): + + def __init__(self, in_emulator=False, host=None): + self.in_emulator = in_emulator + self.host = host diff --git a/unit_tests/pubsub/test_client.py b/unit_tests/pubsub/test_client.py index 810ac406f858..69f6ccbed330 100644 --- a/unit_tests/pubsub/test_client.py +++ b/unit_tests/pubsub/test_client.py @@ -67,7 +67,7 @@ def __init__(self, _wrapped): with _Monkey(MUT, _USE_GAX=True, - GeneratedPublisherAPI=_generated_api, + make_gax_publisher_api=_generated_api, GAXPublisherAPI=_GaxPublisherAPI): api = client.publisher_api @@ -76,6 +76,8 @@ def __init__(self, _wrapped): # API instance is cached again = client.publisher_api self.assertTrue(again is api) + args = (client.connection,) + self.assertEqual(_called_with, [(args, {})]) def test_subscriber_api_wo_gax(self): from google.cloud.pubsub.connection import _SubscriberAPI @@ -115,7 +117,7 @@ def __init__(self, _wrapped): with _Monkey(MUT, _USE_GAX=True, - GeneratedSubscriberAPI=_generated_api, + make_gax_subscriber_api=_generated_api, GAXSubscriberAPI=_GaxSubscriberAPI): api = client.subscriber_api @@ -124,6 +126,8 @@ def __init__(self, _wrapped): # API instance is cached again = client.subscriber_api self.assertTrue(again is api) + args = (client.connection,) + self.assertEqual(_called_with, [(args, {})]) def test_iam_policy_api(self): from google.cloud.pubsub.connection import _IAMPolicyAPI diff --git a/unit_tests/pubsub/test_connection.py b/unit_tests/pubsub/test_connection.py index 0e8cedfadf40..a7f3a49ac188 100644 --- a/unit_tests/pubsub/test_connection.py +++ b/unit_tests/pubsub/test_connection.py @@ -55,31 +55,6 @@ def test_custom_url_from_env(self): self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL) self.assertEqual(conn.api_base_url, 'http://' + HOST) - def test_custom_url_from_constructor(self): - HOST = object() - conn = self._makeOne(api_base_url=HOST) - - klass = self._getTargetClass() - self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL) - self.assertEqual(conn.api_base_url, HOST) - - def test_custom_url_constructor_and_env(self): - import os - from unit_tests._testing import _Monkey - from google.cloud.environment_vars import PUBSUB_EMULATOR - - HOST1 = object() - HOST2 = object() - fake_environ = {PUBSUB_EMULATOR: HOST1} - - with _Monkey(os, getenv=fake_environ.get): - conn = self._makeOne(api_base_url=HOST2) - - klass = self._getTargetClass() - self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL) - self.assertNotEqual(conn.api_base_url, HOST1) - self.assertEqual(conn.api_base_url, HOST2) - def test_build_api_url_no_extra_query_params(self): conn = self._makeOne() URI = '/'.join([ @@ -104,7 +79,8 @@ def test_build_api_url_w_extra_query_params(self): def test_build_api_url_w_base_url_override(self): base_url1 = 'api-base-url1' base_url2 = 'api-base-url2' - conn = self._makeOne(api_base_url=base_url1) + conn = self._makeOne() + conn.api_base_url = base_url1 URI = '/'.join([ base_url2, conn.API_VERSION,