Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating Pub / Sub GAX calls to work with emulator. #2245

Merged
merged 1 commit into from
Sep 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

"""GAX wrapper for Pubsub API requests."""

from google.cloud.pubsub.v1.publisher_api import PublisherApi
from google.cloud.pubsub.v1.subscriber_api import SubscriberApi
from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from google.pubsub.v1.pubsub_pb2 import PushConfig
from grpc.beta.implementations import insecure_channel
from grpc import StatusCode

# pylint: disable=ungrouped-imports
Expand Down Expand Up @@ -498,3 +501,44 @@ def _received_message_pb_to_mapping(received_message_pb):
'message': _message_pb_to_mapping(
received_message_pb.message),
}


def make_gax_publisher_api(connection):
"""Create an instance of the GAX Publisher API.
If the ``connection`` is intended for a local emulator, then
an insecure ``channel`` is created pointing at the local
Pub / Sub server.
:type connection: :class:`~google.cloud.pubsub.connection.Connection`
:param connection: The connection that holds configuration details.
:rtype: :class:`~google.cloud.pubsub.v1.publisher_api.PublisherApi`
:returns: A publisher API instance with the proper connection
configuration.
:rtype: :class:`~google.cloud.pubsub.v1.subscriber_api.SubscriberApi`
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
return PublisherApi(channel=channel)


def make_gax_subscriber_api(connection):
"""Create an instance of the GAX Subscriber API.
If the ``connection`` is intended for a local emulator, then
an insecure ``channel`` is created pointing at the local
Pub / Sub server.
:type connection: :class:`~google.cloud.pubsub.connection.Connection`
:param connection: The connection that holds configuration details.
:rtype: :class:`~google.cloud.pubsub.v1.subscriber_api.SubscriberApi`
:returns: A subscriber API instance with the proper connection
configuration.
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
return SubscriberApi(channel=channel)
16 changes: 8 additions & 8 deletions google/cloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@

# pylint: disable=ungrouped-imports
try:
from google.cloud.pubsub.v1.publisher_api import (
PublisherApi as GeneratedPublisherAPI)
from google.cloud.pubsub.v1.subscriber_api import (
SubscriberApi as GeneratedSubscriberAPI)
from google.cloud.pubsub._gax import _PublisherAPI as GAXPublisherAPI
from google.cloud.pubsub._gax import _SubscriberAPI as GAXSubscriberAPI
from google.cloud.pubsub._gax import make_gax_publisher_api
from google.cloud.pubsub._gax import make_gax_subscriber_api
except ImportError: # pragma: NO COVER
_HAVE_GAX = False
GeneratedPublisherAPI = GAXPublisherAPI = None
GeneratedSubscriberAPI = GAXSubscriberAPI = None
GAXPublisherAPI = None
GAXSubscriberAPI = None
make_gax_publisher_api = None
make_gax_subscriber_api = None
else:
_HAVE_GAX = True
# pylint: enable=ungrouped-imports
Expand Down Expand Up @@ -75,7 +75,7 @@ def publisher_api(self):
"""Helper for publisher-related API calls."""
if self._publisher_api is None:
if _USE_GAX:
generated = GeneratedPublisherAPI()
generated = make_gax_publisher_api(self.connection)
self._publisher_api = GAXPublisherAPI(generated)
else:
self._publisher_api = JSONPublisherAPI(self.connection)
Expand All @@ -86,7 +86,7 @@ def subscriber_api(self):
"""Helper for subscriber-related API calls."""
if self._subscriber_api is None:
if _USE_GAX:
generated = GeneratedSubscriberAPI()
generated = make_gax_subscriber_api(self.connection)
self._subscriber_api = GAXSubscriberAPI(generated)
else:
self._subscriber_api = JSONSubscriberAPI(self.connection)
Expand Down
28 changes: 15 additions & 13 deletions google/cloud/pubsub/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from google.cloud.environment_vars import PUBSUB_EMULATOR


PUBSUB_API_HOST = 'pubsub.googleapis.com'
"""Pub / Sub API request host."""


class Connection(base_connection.JSONConnection):
"""A connection to Google Cloud Pub/Sub via the JSON REST API.
Expand All @@ -29,13 +33,9 @@ class Connection(base_connection.JSONConnection):
:type http: :class:`httplib2.Http` or class that defines ``request()``.
:param http: (Optional) HTTP object to make requests.
:type api_base_url: string
:param api_base_url: The base of the API call URL. Defaults to the value
:attr:`Connection.API_BASE_URL`.
"""

API_BASE_URL = 'https://pubsub.googleapis.com'
API_BASE_URL = 'https://' + PUBSUB_API_HOST
"""The base of the API call URL."""

API_VERSION = 'v1'
Expand All @@ -48,15 +48,17 @@ class Connection(base_connection.JSONConnection):
'https://www.googleapis.com/auth/cloud-platform')
"""The scopes required for authenticating as a Cloud Pub/Sub consumer."""

def __init__(self, credentials=None, http=None, api_base_url=None):
def __init__(self, credentials=None, http=None):
super(Connection, self).__init__(credentials=credentials, http=http)
if api_base_url is None:
emulator_host = os.getenv(PUBSUB_EMULATOR)
if emulator_host is None:
api_base_url = self.__class__.API_BASE_URL
else:
api_base_url = 'http://' + emulator_host
self.api_base_url = api_base_url
emulator_host = os.getenv(PUBSUB_EMULATOR)
if emulator_host is None:
self.host = self.__class__.API_BASE_URL
self.api_base_url = self.__class__.API_BASE_URL
self.in_emulator = False
else:
self.host = emulator_host
self.api_base_url = 'http://' + emulator_host
self.in_emulator = True

def build_api_url(self, path, query_params=None,
api_base_url=None, api_version=None):
Expand Down
113 changes: 113 additions & 0 deletions unit_tests/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,112 @@ def test_subscription_modify_ack_deadline_error(self):
self.assertEqual(options, None)


@unittest.skipUnless(_HAVE_GAX, 'No gax-python')
class Test_make_gax_publisher_api(_Base, unittest.TestCase):

def _callFUT(self, connection):
from google.cloud.pubsub._gax import make_gax_publisher_api
return make_gax_publisher_api(connection)

def test_live_api(self):
from unit_tests._testing import _Monkey
from google.cloud.pubsub import _gax as MUT

channels = []
mock_result = object()

def mock_publisher_api(channel):
channels.append(channel)
return mock_result

connection = _Connection(in_emulator=False)
with _Monkey(MUT, PublisherApi=mock_publisher_api):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [None])

def test_emulator(self):
from unit_tests._testing import _Monkey
from google.cloud.pubsub import _gax as MUT

channels = []
mock_result = object()
insecure_args = []
mock_channel = object()

def mock_publisher_api(channel):
channels.append(channel)
return mock_result

def mock_insecure_channel(host, port):
insecure_args.append((host, port))
return mock_channel

host = 'CURR_HOST:1234'
connection = _Connection(in_emulator=True, host=host)
with _Monkey(MUT, PublisherApi=mock_publisher_api,
insecure_channel=mock_insecure_channel):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [mock_channel])
self.assertEqual(insecure_args, [(host, None)])


@unittest.skipUnless(_HAVE_GAX, 'No gax-python')
class Test_make_gax_subscriber_api(_Base, unittest.TestCase):

def _callFUT(self, connection):
from google.cloud.pubsub._gax import make_gax_subscriber_api
return make_gax_subscriber_api(connection)

def test_live_api(self):
from unit_tests._testing import _Monkey
from google.cloud.pubsub import _gax as MUT

channels = []
mock_result = object()

def mock_subscriber_api(channel):
channels.append(channel)
return mock_result

connection = _Connection(in_emulator=False)
with _Monkey(MUT, SubscriberApi=mock_subscriber_api):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [None])

def test_emulator(self):
from unit_tests._testing import _Monkey
from google.cloud.pubsub import _gax as MUT

channels = []
mock_result = object()
insecure_args = []
mock_channel = object()

def mock_subscriber_api(channel):
channels.append(channel)
return mock_result

def mock_insecure_channel(host, port):
insecure_args.append((host, port))
return mock_channel

host = 'CURR_HOST:1234'
connection = _Connection(in_emulator=True, host=host)
with _Monkey(MUT, SubscriberApi=mock_subscriber_api,
insecure_channel=mock_insecure_channel):
result = self._callFUT(connection)

self.assertIs(result, mock_result)
self.assertEqual(channels, [mock_channel])
self.assertEqual(insecure_args, [(host, None)])


class _GAXPublisherAPI(_GAXBaseAPI):

_create_topic_conflict = False
Expand Down Expand Up @@ -930,3 +1036,10 @@ def __init__(self, name, topic, push_endpoint, ack_deadline_seconds):
self.topic = topic
self.push_config = _PushConfigPB(push_endpoint)
self.ack_deadline_seconds = ack_deadline_seconds


class _Connection(object):

def __init__(self, in_emulator=False, host=None):
self.in_emulator = in_emulator
self.host = host
8 changes: 6 additions & 2 deletions unit_tests/pubsub/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, _wrapped):

with _Monkey(MUT,
_USE_GAX=True,
GeneratedPublisherAPI=_generated_api,
make_gax_publisher_api=_generated_api,
GAXPublisherAPI=_GaxPublisherAPI):
api = client.publisher_api

Expand All @@ -76,6 +76,8 @@ def __init__(self, _wrapped):
# API instance is cached
again = client.publisher_api
self.assertTrue(again is api)
args = (client.connection,)
self.assertEqual(_called_with, [(args, {})])

def test_subscriber_api_wo_gax(self):
from google.cloud.pubsub.connection import _SubscriberAPI
Expand Down Expand Up @@ -115,7 +117,7 @@ def __init__(self, _wrapped):

with _Monkey(MUT,
_USE_GAX=True,
GeneratedSubscriberAPI=_generated_api,
make_gax_subscriber_api=_generated_api,
GAXSubscriberAPI=_GaxSubscriberAPI):
api = client.subscriber_api

Expand All @@ -124,6 +126,8 @@ def __init__(self, _wrapped):
# API instance is cached
again = client.subscriber_api
self.assertTrue(again is api)
args = (client.connection,)
self.assertEqual(_called_with, [(args, {})])

def test_iam_policy_api(self):
from google.cloud.pubsub.connection import _IAMPolicyAPI
Expand Down
28 changes: 2 additions & 26 deletions unit_tests/pubsub/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,6 @@ def test_custom_url_from_env(self):
self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL)
self.assertEqual(conn.api_base_url, 'http://' + HOST)

def test_custom_url_from_constructor(self):
HOST = object()
conn = self._makeOne(api_base_url=HOST)

klass = self._getTargetClass()
self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL)
self.assertEqual(conn.api_base_url, HOST)

def test_custom_url_constructor_and_env(self):
import os
from unit_tests._testing import _Monkey
from google.cloud.environment_vars import PUBSUB_EMULATOR

HOST1 = object()
HOST2 = object()
fake_environ = {PUBSUB_EMULATOR: HOST1}

with _Monkey(os, getenv=fake_environ.get):
conn = self._makeOne(api_base_url=HOST2)

klass = self._getTargetClass()
self.assertNotEqual(conn.api_base_url, klass.API_BASE_URL)
self.assertNotEqual(conn.api_base_url, HOST1)
self.assertEqual(conn.api_base_url, HOST2)

def test_build_api_url_no_extra_query_params(self):
conn = self._makeOne()
URI = '/'.join([
Expand All @@ -104,7 +79,8 @@ def test_build_api_url_w_extra_query_params(self):
def test_build_api_url_w_base_url_override(self):
base_url1 = 'api-base-url1'
base_url2 = 'api-base-url2'
conn = self._makeOne(api_base_url=base_url1)
conn = self._makeOne()
conn.api_base_url = base_url1
URI = '/'.join([
base_url2,
conn.API_VERSION,
Expand Down