Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remap new Gax conflict error code #3443

Merged
merged 3 commits into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/google/cloud/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def _make_grpc_failed_precondition(self):
from grpc import StatusCode
return self._make_grpc_error(StatusCode.FAILED_PRECONDITION)

def _make_grpc_already_exists(self):
from grpc import StatusCode
return self._make_grpc_error(StatusCode.ALREADY_EXISTS)

def _make_grpc_deadline_exceeded(self):
from grpc import StatusCode
return self._make_grpc_error(StatusCode.DEADLINE_EXCEEDED)
Expand Down
9 changes: 6 additions & 3 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

_CONFLICT_ERROR_CODES = (
StatusCode.FAILED_PRECONDITION, StatusCode.ALREADY_EXISTS)


class _PublisherAPI(object):
"""Helper mapping publisher-related APIs.
Expand Down Expand Up @@ -105,7 +108,7 @@ def topic_create(self, topic_path):
try:
topic_pb = self._gax_api.create_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
raise Conflict(topic_path)
raise
return {'name': topic_pb.name}
Expand Down Expand Up @@ -337,7 +340,7 @@ def subscription_create(self, subscription_path, topic_path,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
raise Conflict(topic_path)
raise
return MessageToDict(sub_pb)
Expand Down Expand Up @@ -584,7 +587,7 @@ def snapshot_create(self, snapshot_path, subscription_path):
snapshot_pb = self._gax_api.create_snapshot(
snapshot_path, subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
raise Conflict(snapshot_path)
elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
Expand Down
10 changes: 10 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import httplib2

from google.cloud.environment_vars import PUBSUB_EMULATOR
from google.cloud.exceptions import Conflict
from google.cloud.pubsub import client

from test_utils.retry import RetryInstanceState
Expand Down Expand Up @@ -113,6 +114,9 @@ def test_create_topic(self):
self.assertTrue(topic.exists())
self.assertEqual(topic.name, topic_name)

with self.assertRaises(Conflict):
topic.create()

def test_list_topics(self):
before = _consume_topics(Config.CLIENT)
topics_to_create = [
Expand Down Expand Up @@ -152,6 +156,9 @@ def test_create_subscription_defaults(self):
self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
self.assertIs(subscription.topic, topic)

with self.assertRaises(Conflict):
subscription.create()

def test_create_subscription_w_ack_deadline(self):
TOPIC_NAME = 'create-sub-ack' + unique_resource_id('-')
topic = Config.CLIENT.topic(TOPIC_NAME)
Expand Down Expand Up @@ -350,6 +357,9 @@ def full_name(obj):
self.assertIn(snapshot.full_name, map(full_name, after_snapshots))
self.assertNotIn(snapshot.full_name, map(full_name, before_snapshots))

with self.assertRaises(Conflict):
snapshot.create()


def test_seek(self):
TOPIC_NAME = 'seek-e2e' + unique_resource_id('-')
Expand Down
81 changes: 72 additions & 9 deletions pubsub/tests/unit/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,24 @@ def test_topic_create(self):
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertIsNone(options)

def test_topic_create_failed_precondition(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXPublisherAPI(_create_topic_failed_precondition=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(Conflict):
api.topic_create(self.TOPIC_PATH)

topic_path, options = gax_api._create_topic_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertIsNone(options)

def test_topic_create_already_exists(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXPublisherAPI(_create_topic_conflict=True)
gax_api = _GAXPublisherAPI(_create_topic_already_exists=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

Expand Down Expand Up @@ -597,11 +611,35 @@ def test_subscription_create_optional_params(self):
expected_message_retention_duration.total_seconds())
self.assertIsNone(options)

def test_subscription_create_failed_precondition(self):
from google.cloud.exceptions import Conflict

DEADLINE = 600
gax_api = _GAXSubscriberAPI(
_create_subscription_failed_precondition=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(Conflict):
api.subscription_create(
self.SUB_PATH, self.TOPIC_PATH, DEADLINE, self.PUSH_ENDPOINT)

(name, topic, push_config, ack_deadline, retain_acked_messages,
message_retention_duration, options) = (
gax_api._create_subscription_called_with)
self.assertEqual(name, self.SUB_PATH)
self.assertEqual(topic, self.TOPIC_PATH)
self.assertEqual(push_config.push_endpoint, self.PUSH_ENDPOINT)
self.assertEqual(ack_deadline, DEADLINE)
self.assertIsNone(retain_acked_messages)
self.assertIsNone(message_retention_duration)
self.assertIsNone(options)

def test_subscription_create_already_exists(self):
from google.cloud.exceptions import Conflict

DEADLINE = 600
gax_api = _GAXSubscriberAPI(_create_subscription_conflict=True)
gax_api = _GAXSubscriberAPI(_create_subscription_already_exists=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

Expand Down Expand Up @@ -1121,10 +1159,26 @@ def test_snapshot_create(self):
self.assertEqual(subscription, self.SUB_PATH)
self.assertIsNone(options)

def test_snapshot_create_failed_precondition(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXSubscriberAPI(_create_snapshot_failed_precondition=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(Conflict):
api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH)

name, subscription, options = (
gax_api._create_snapshot_called_with)
self.assertEqual(name, self.SNAPSHOT_PATH)
self.assertEqual(subscription, self.SUB_PATH)
self.assertIsNone(options)

def test_snapshot_create_already_exists(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXSubscriberAPI(_create_snapshot_conflict=True)
gax_api = _GAXSubscriberAPI(_create_snapshot_already_exists=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

Expand Down Expand Up @@ -1371,7 +1425,8 @@ def mock_insecure_channel(host):

class _GAXPublisherAPI(_GAXBaseAPI):

_create_topic_conflict = False
_create_topic_failed_precondition = False
_create_topic_already_exists = False

def list_topics(self, name, page_size, options):
self._list_topics_called_with = name, page_size, options
Expand All @@ -1383,8 +1438,10 @@ def create_topic(self, name, options=None):
self._create_topic_called_with = name, options
if self._random_gax_error:
raise GaxError('error')
if self._create_topic_conflict:
if self._create_topic_failed_precondition:
raise GaxError('conflict', self._make_grpc_failed_precondition())
if self._create_topic_already_exists:
raise GaxError('conflict', self._make_grpc_already_exists())
return self._create_topic_response

def get_topic(self, name, options=None):
Expand Down Expand Up @@ -1432,8 +1489,10 @@ def list_topic_subscriptions(self, topic, page_size, options=None):

class _GAXSubscriberAPI(_GAXBaseAPI):

_create_snapshot_conflict = False
_create_subscription_conflict = False
_create_snapshot_already_exists = False
_create_snapshot_failed_precondition = False
_create_subscription_already_exists = False
_create_subscription_failed_precondition = False
_modify_push_config_ok = False
_acknowledge_ok = False
_modify_ack_deadline_ok = False
Expand All @@ -1456,8 +1515,10 @@ def create_subscription(self, name, topic, push_config=None,
retain_acked_messages, message_retention_duration, options)
if self._random_gax_error:
raise GaxError('error')
if self._create_subscription_conflict:
if self._create_subscription_failed_precondition:
raise GaxError('conflict', self._make_grpc_failed_precondition())
if self._create_subscription_already_exists:
raise GaxError('conflict', self._make_grpc_already_exists())
return self._create_subscription_response

def get_subscription(self, name, options=None):
Expand Down Expand Up @@ -1533,7 +1594,9 @@ def create_snapshot(self, name, subscription, options=None):
self._create_snapshot_called_with = (name, subscription, options)
if self._random_gax_error:
raise GaxError('error')
if self._create_snapshot_conflict:
if self._create_snapshot_already_exists:
raise GaxError('conflict', self._make_grpc_already_exists())
if self._create_snapshot_failed_precondition:
raise GaxError('conflict', self._make_grpc_failed_precondition())
if self._snapshot_create_subscription_miss:
raise GaxError('miss', self._make_grpc_not_found())
Expand Down