Skip to content

Commit

Permalink
Updating Pub / Sub GAX calls to work with emulator.
Browse files Browse the repository at this point in the history
In the process, removing the api_base_url from the
Pub / Sub Connection class.
  • Loading branch information
dhermes committed Sep 4, 2016
1 parent 189c852 commit 31e3369
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 49 deletions.
44 changes: 44 additions & 0 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

"""GAX wrapper for Pubsub API requests."""

from google.cloud.pubsub.v1.publisher_api import PublisherApi
from google.cloud.pubsub.v1.subscriber_api import SubscriberApi
from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from google.pubsub.v1.pubsub_pb2 import PushConfig
from grpc.beta.implementations import insecure_channel
from grpc import StatusCode

from gcloud._helpers import _to_bytes
Expand Down Expand Up @@ -494,3 +497,44 @@ def _received_message_pb_to_mapping(received_message_pb):
'message': _message_pb_to_mapping(
received_message_pb.message),
}


def make_gax_publisher_api(connection):
"""Create an instance of the GAX Publisher API.
If the ``connection`` is intended for a local emulator, then
an insecure ``channel`` is created pointing at the local
Pub / Sub server.
:type connection: :class:`~gcloud.pubsub.connection.Connection`
:param connection: The connection that holds configuration details.
:rtype: :class:`~google.cloud.pubsub.v1.publisher_api.PublisherApi`
:returns: A publisher API instance with the proper connection
configuration.
:rtype: :class:`~google.cloud.pubsub.v1.subscriber_api.SubscriberApi`
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
return PublisherApi(channel=channel)


def make_gax_subscriber_api(connection):
"""Create an instance of the GAX Subscriber API.
If the ``connection`` is intended for a local emulator, then
an insecure ``channel`` is created pointing at the local
Pub / Sub server.
:type connection: :class:`~gcloud.pubsub.connection.Connection`
:param connection: The connection that holds configuration details.
:rtype: :class:`~google.cloud.pubsub.v1.subscriber_api.SubscriberApi`
:returns: A subscriber API instance with the proper connection
configuration.
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
return SubscriberApi(channel=channel)
16 changes: 8 additions & 8 deletions gcloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@

# pylint: disable=ungrouped-imports
try:
from google.cloud.pubsub.v1.publisher_api import (
PublisherApi as GeneratedPublisherAPI)
from google.cloud.pubsub.v1.subscriber_api import (
SubscriberApi as GeneratedSubscriberAPI)
from gcloud.pubsub._gax import _PublisherAPI as GAXPublisherAPI
from gcloud.pubsub._gax import _SubscriberAPI as GAXSubscriberAPI
from gcloud.pubsub._gax import make_gax_publisher_api
from gcloud.pubsub._gax import make_gax_subscriber_api
except ImportError: # pragma: NO COVER
_HAVE_GAX = False
GeneratedPublisherAPI = GAXPublisherAPI = None
GeneratedSubscriberAPI = GAXSubscriberAPI = None
GAXPublisherAPI = None
GAXSubscriberAPI = None
make_gax_publisher_api = None
make_gax_subscriber_api = None
else:
_HAVE_GAX = True
# pylint: enable=ungrouped-imports
Expand Down Expand Up @@ -75,7 +75,7 @@ def publisher_api(self):
"""Helper for publisher-related API calls."""
if self._publisher_api is None:
if _USE_GAX:
generated = GeneratedPublisherAPI()
generated = make_gax_publisher_api(self.connection)
self._publisher_api = GAXPublisherAPI(generated)
else:
self._publisher_api = JSONPublisherAPI(self.connection)
Expand All @@ -86,7 +86,7 @@ def subscriber_api(self):
"""Helper for subscriber-related API calls."""
if self._subscriber_api is None:
if _USE_GAX:
generated = GeneratedSubscriberAPI()
generated = make_gax_subscriber_api(self.connection)
self._subscriber_api = GAXSubscriberAPI(generated)
else:
self._subscriber_api = JSONSubscriberAPI(self.connection)
Expand Down
28 changes: 15 additions & 13 deletions gcloud/pubsub/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from gcloud.environment_vars import PUBSUB_EMULATOR


PUBSUB_API_HOST = 'pubsub.googleapis.com'
"""Pub / Sub API request host."""


class Connection(base_connection.JSONConnection):
"""A connection to Google Cloud Pub/Sub via the JSON REST API.
Expand All @@ -29,13 +33,9 @@ class Connection(base_connection.JSONConnection):
:type http: :class:`httplib2.Http` or class that defines ``request()``.
:param http: (Optional) HTTP object to make requests.
:type api_base_url: string
:param api_base_url: The base of the API call URL. Defaults to the value
:attr:`Connection.API_BASE_URL`.
"""

API_BASE_URL = 'https://pubsub.googleapis.com'
API_BASE_URL = 'https://' + PUBSUB_API_HOST
"""The base of the API call URL."""

API_VERSION = 'v1'
Expand All @@ -48,15 +48,17 @@ class Connection(base_connection.JSONConnection):
'https://www.googleapis.com/auth/cloud-platform')
"""The scopes required for authenticating as a Cloud Pub/Sub consumer."""

def __init__(self, credentials=None, http=None, api_base_url=None):
def __init__(self, credentials=None, http=None):
super(Connection, self).__init__(credentials=credentials, http=http)
if api_base_url is None:
emulator_host = os.getenv(PUBSUB_EMULATOR)
if emulator_host is None:
api_base_url = self.__class__.API_BASE_URL
else:
api_base_url = 'http://' + emulator_host
self.api_base_url = api_base_url
emulator_host = os.getenv(PUBSUB_EMULATOR)
if emulator_host is None:
self.host = self.__class__.API_BASE_URL
self.api_base_url = self.__class__.API_BASE_URL
self.in_emulator = False
else:
self.host = emulator_host
self.api_base_url = 'http://' + emulator_host
self.in_emulator = True

def build_api_url(self, path, query_params=None,
api_base_url=None, api_version=None):
Expand Down
113 changes: 113 additions & 0 deletions unit_tests/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,112 @@ def test_subscription_modify_ack_deadline_error(self):
self.assertEqual(options, None)


@unittest.skipUnless(_HAVE_GAX, 'No gax-python')
class Test_make_gax_publisher_api(_Base, unittest.TestCase):

def _callFUT(self, connection):
from gcloud.pubsub._gax import make_gax_publisher_api
return make_gax_publisher_api(connection)

def test_live_api(self):
from unit_tests._testing import _Monkey
from gcloud.pubsub import _gax as MUT

channels = []
mock_result = object()

def mock_publisher_api(channel):
channels.append(channel)
return mock_result

connection = _Connection(in_emulator=False)
with _Monkey(MUT, PublisherApi=mock_publisher_api):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [None])

def test_emulator(self):
from unit_tests._testing import _Monkey
from gcloud.pubsub import _gax as MUT

channels = []
mock_result = object()
insecure_args = []
mock_channel = object()

def mock_publisher_api(channel):
channels.append(channel)
return mock_result

def mock_insecure_channel(host, port):
insecure_args.append((host, port))
return mock_channel

host = 'CURR_HOST:1234'
connection = _Connection(in_emulator=True, host=host)
with _Monkey(MUT, PublisherApi=mock_publisher_api,
insecure_channel=mock_insecure_channel):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [mock_channel])
self.assertEqual(insecure_args, [(host, None)])


@unittest.skipUnless(_HAVE_GAX, 'No gax-python')
class Test_make_gax_subscriber_api(_Base, unittest.TestCase):

def _callFUT(self, connection):
from gcloud.pubsub._gax import make_gax_subscriber_api
return make_gax_subscriber_api(connection)

def test_live_api(self):
from unit_tests._testing import _Monkey
from gcloud.pubsub import _gax as MUT

channels = []
mock_result = object()

def mock_subscriber_api(channel):
channels.append(channel)
return mock_result

connection = _Connection(in_emulator=False)
with _Monkey(MUT, SubscriberApi=mock_subscriber_api):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [None])

def test_emulator(self):
from unit_tests._testing import _Monkey
from gcloud.pubsub import _gax as MUT

channels = []
mock_result = object()
insecure_args = []
mock_channel = object()

def mock_subscriber_api(channel):
channels.append(channel)
return mock_result

def mock_insecure_channel(host, port):
insecure_args.append((host, port))
return mock_channel

host = 'CURR_HOST:1234'
connection = _Connection(in_emulator=True, host=host)
with _Monkey(MUT, SubscriberApi=mock_subscriber_api,
insecure_channel=mock_insecure_channel):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [mock_channel])
self.assertEqual(insecure_args, [(host, None)])


class _GAXPublisherAPI(_GAXBaseAPI):

_create_topic_conflict = False
Expand Down Expand Up @@ -917,3 +1023,10 @@ def __init__(self, name, topic, push_endpoint, ack_deadline_seconds):
self.topic = topic
self.push_config = _PushConfigPB(push_endpoint)
self.ack_deadline_seconds = ack_deadline_seconds


class _Connection(object):

def __init__(self, in_emulator=False, host=None):
self.in_emulator = in_emulator
self.host = host
8 changes: 6 additions & 2 deletions unit_tests/pubsub/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, _wrapped):

with _Monkey(MUT,
_USE_GAX=True,
GeneratedPublisherAPI=_generated_api,
make_gax_publisher_api=_generated_api,
GAXPublisherAPI=_GaxPublisherAPI):
api = client.publisher_api

Expand All @@ -76,6 +76,8 @@ def __init__(self, _wrapped):
# API instance is cached
again = client.publisher_api
self.assertTrue(again is api)
args = (client.connection,)
self.assertEqual(_called_with, [(args, {})])

def test_subscriber_api_wo_gax(self):
from gcloud.pubsub.connection import _SubscriberAPI
Expand Down Expand Up @@ -115,7 +117,7 @@ def __init__(self, _wrapped):

with _Monkey(MUT,
_USE_GAX=True,
GeneratedSubscriberAPI=_generated_api,
make_gax_subscriber_api=_generated_api,
GAXSubscriberAPI=_GaxSubscriberAPI):
api = client.subscriber_api

Expand All @@ -124,6 +126,8 @@ def __init__(self, _wrapped):
# API instance is cached
again = client.subscriber_api
self.assertTrue(again is api)
args = (client.connection,)
self.assertEqual(_called_with, [(args, {})])

def test_iam_policy_api(self):
from gcloud.pubsub.connection import _IAMPolicyAPI
Expand Down
28 changes: 2 additions & 26 deletions unit_tests/pubsub/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,6 @@ def test_custom_url_from_env(self):
self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL)
self.assertEqual(conn.api_base_url, 'http://' + HOST)

def test_custom_url_from_constructor(self):
HOST = object()
conn = self._makeOne(api_base_url=HOST)

klass = self._getTargetClass()
self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL)
self.assertEqual(conn.api_base_url, HOST)

def test_custom_url_constructor_and_env(self):
import os
from unit_tests._testing import _Monkey
from gcloud.environment_vars import PUBSUB_EMULATOR

HOST1 = object()
HOST2 = object()
fake_environ = {PUBSUB_EMULATOR: HOST1}

with _Monkey(os, getenv=fake_environ.get):
conn = self._makeOne(api_base_url=HOST2)

klass = self._getTargetClass()
self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL)
self.assertNotEqual(conn.api_base_url, HOST1)
self.assertEqual(conn.api_base_url, HOST2)

def test_build_api_url_no_extra_query_params(self):
conn = self._makeOne()
URI = '/'.join([
Expand All @@ -104,7 +79,8 @@ def test_build_api_url_w_extra_query_params(self):
def test_build_api_url_w_base_url_override(self):
base_url1 = 'api-base-url1'
base_url2 = 'api-base-url2'
conn = self._makeOne(api_base_url=base_url1)
conn = self._makeOne()
conn.api_base_url = base_url1
URI = '/'.join([
base_url2,
conn.API_VERSION,
Expand Down

0 comments on commit 31e3369

Please sign in to comment.