diff --git a/core/google/cloud/_testing.py b/core/google/cloud/_testing.py index f9d2b57fda52a..a544fffc5fe4a 100644 --- a/core/google/cloud/_testing.py +++ b/core/google/cloud/_testing.py @@ -95,6 +95,10 @@ def _make_grpc_failed_precondition(self): from grpc import StatusCode return self._make_grpc_error(StatusCode.FAILED_PRECONDITION) + def _make_grpc_already_exists(self): + from grpc import StatusCode + return self._make_grpc_error(StatusCode.ALREADY_EXISTS) + def _make_grpc_deadline_exceeded(self): from grpc import StatusCode return self._make_grpc_error(StatusCode.DEADLINE_EXCEEDED) diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index d32f8eb069a78..94dc639178ef9 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -42,6 +42,9 @@ from google.cloud.pubsub.subscription import Subscription from google.cloud.pubsub.topic import Topic +_CONFLICT_ERROR_CODES = ( + StatusCode.FAILED_PRECONDITION, StatusCode.ALREADY_EXISTS) + class _PublisherAPI(object): """Helper mapping publisher-related APIs. @@ -105,7 +108,7 @@ def topic_create(self, topic_path): try: topic_pb = self._gax_api.create_topic(topic_path) except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES: raise Conflict(topic_path) raise return {'name': topic_pb.name} @@ -337,7 +340,7 @@ def subscription_create(self, subscription_path, topic_path, retain_acked_messages=retain_acked_messages, message_retention_duration=message_retention_duration) except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES: raise Conflict(topic_path) raise return MessageToDict(sub_pb) @@ -584,7 +587,7 @@ def snapshot_create(self, snapshot_path, subscription_path): snapshot_pb = self._gax_api.create_snapshot( snapshot_path, subscription_path) except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES: raise Conflict(snapshot_path) elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(subscription_path) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index acdbde0dffca4..d55011a5254ec 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -22,6 +22,7 @@ import httplib2 from google.cloud.environment_vars import PUBSUB_EMULATOR +from google.cloud.exceptions import Conflict from google.cloud.pubsub import client from test_utils.retry import RetryInstanceState @@ -113,6 +114,9 @@ def test_create_topic(self): self.assertTrue(topic.exists()) self.assertEqual(topic.name, topic_name) + with self.assertRaises(Conflict): + topic.create() + def test_list_topics(self): before = _consume_topics(Config.CLIENT) topics_to_create = [ @@ -152,6 +156,9 @@ def test_create_subscription_defaults(self): self.assertEqual(subscription.name, SUBSCRIPTION_NAME) self.assertIs(subscription.topic, topic) + with self.assertRaises(Conflict): + subscription.create() + def test_create_subscription_w_ack_deadline(self): TOPIC_NAME = 'create-sub-ack' + unique_resource_id('-') topic = Config.CLIENT.topic(TOPIC_NAME) @@ -350,6 +357,9 @@ def full_name(obj): self.assertIn(snapshot.full_name, map(full_name, after_snapshots)) self.assertNotIn(snapshot.full_name, map(full_name, before_snapshots)) + with self.assertRaises(Conflict): + snapshot.create() + def test_seek(self): TOPIC_NAME = 'seek-e2e' + unique_resource_id('-') diff --git a/pubsub/tests/unit/test__gax.py b/pubsub/tests/unit/test__gax.py index 2bd7983b40afa..dd2ea8077f84b 100644 --- a/pubsub/tests/unit/test__gax.py +++ b/pubsub/tests/unit/test__gax.py @@ -141,10 +141,24 @@ def test_topic_create(self): self.assertEqual(topic_path, self.TOPIC_PATH) self.assertIsNone(options) + def test_topic_create_failed_precondition(self): + from google.cloud.exceptions import Conflict + + gax_api = _GAXPublisherAPI(_create_topic_failed_precondition=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(Conflict): + api.topic_create(self.TOPIC_PATH) + + topic_path, options = gax_api._create_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertIsNone(options) + def test_topic_create_already_exists(self): from google.cloud.exceptions import Conflict - gax_api = _GAXPublisherAPI(_create_topic_conflict=True) + gax_api = _GAXPublisherAPI(_create_topic_already_exists=True) client = _Client(self.PROJECT) api = self._make_one(gax_api, client) @@ -597,11 +611,35 @@ def test_subscription_create_optional_params(self): expected_message_retention_duration.total_seconds()) self.assertIsNone(options) + def test_subscription_create_failed_precondition(self): + from google.cloud.exceptions import Conflict + + DEADLINE = 600 + gax_api = _GAXSubscriberAPI( + _create_subscription_failed_precondition=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(Conflict): + api.subscription_create( + self.SUB_PATH, self.TOPIC_PATH, DEADLINE, self.PUSH_ENDPOINT) + + (name, topic, push_config, ack_deadline, retain_acked_messages, + message_retention_duration, options) = ( + gax_api._create_subscription_called_with) + self.assertEqual(name, self.SUB_PATH) + self.assertEqual(topic, self.TOPIC_PATH) + self.assertEqual(push_config.push_endpoint, self.PUSH_ENDPOINT) + self.assertEqual(ack_deadline, DEADLINE) + self.assertIsNone(retain_acked_messages) + self.assertIsNone(message_retention_duration) + self.assertIsNone(options) + def test_subscription_create_already_exists(self): from google.cloud.exceptions import Conflict DEADLINE = 600 - gax_api = _GAXSubscriberAPI(_create_subscription_conflict=True) + gax_api = _GAXSubscriberAPI(_create_subscription_already_exists=True) client = _Client(self.PROJECT) api = self._make_one(gax_api, client) @@ -1121,10 +1159,26 @@ def test_snapshot_create(self): self.assertEqual(subscription, self.SUB_PATH) self.assertIsNone(options) + def test_snapshot_create_failed_precondition(self): + from google.cloud.exceptions import Conflict + + gax_api = _GAXSubscriberAPI(_create_snapshot_failed_precondition=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(Conflict): + api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH) + + name, subscription, options = ( + gax_api._create_snapshot_called_with) + self.assertEqual(name, self.SNAPSHOT_PATH) + self.assertEqual(subscription, self.SUB_PATH) + self.assertIsNone(options) + def test_snapshot_create_already_exists(self): from google.cloud.exceptions import Conflict - gax_api = _GAXSubscriberAPI(_create_snapshot_conflict=True) + gax_api = _GAXSubscriberAPI(_create_snapshot_already_exists=True) client = _Client(self.PROJECT) api = self._make_one(gax_api, client) @@ -1371,7 +1425,8 @@ def mock_insecure_channel(host): class _GAXPublisherAPI(_GAXBaseAPI): - _create_topic_conflict = False + _create_topic_failed_precondition = False + _create_topic_already_exists = False def list_topics(self, name, page_size, options): self._list_topics_called_with = name, page_size, options @@ -1383,8 +1438,10 @@ def create_topic(self, name, options=None): self._create_topic_called_with = name, options if self._random_gax_error: raise GaxError('error') - if self._create_topic_conflict: + if self._create_topic_failed_precondition: raise GaxError('conflict', self._make_grpc_failed_precondition()) + if self._create_topic_already_exists: + raise GaxError('conflict', self._make_grpc_already_exists()) return self._create_topic_response def get_topic(self, name, options=None): @@ -1432,8 +1489,10 @@ def list_topic_subscriptions(self, topic, page_size, options=None): class _GAXSubscriberAPI(_GAXBaseAPI): - _create_snapshot_conflict = False - _create_subscription_conflict = False + _create_snapshot_already_exists = False + _create_snapshot_failed_precondition = False + _create_subscription_already_exists = False + _create_subscription_failed_precondition = False _modify_push_config_ok = False _acknowledge_ok = False _modify_ack_deadline_ok = False @@ -1456,8 +1515,10 @@ def create_subscription(self, name, topic, push_config=None, retain_acked_messages, message_retention_duration, options) if self._random_gax_error: raise GaxError('error') - if self._create_subscription_conflict: + if self._create_subscription_failed_precondition: raise GaxError('conflict', self._make_grpc_failed_precondition()) + if self._create_subscription_already_exists: + raise GaxError('conflict', self._make_grpc_already_exists()) return self._create_subscription_response def get_subscription(self, name, options=None): @@ -1533,7 +1594,9 @@ def create_snapshot(self, name, subscription, options=None): self._create_snapshot_called_with = (name, subscription, options) if self._random_gax_error: raise GaxError('error') - if self._create_snapshot_conflict: + if self._create_snapshot_already_exists: + raise GaxError('conflict', self._make_grpc_already_exists()) + if self._create_snapshot_failed_precondition: raise GaxError('conflict', self._make_grpc_failed_precondition()) if self._snapshot_create_subscription_miss: raise GaxError('miss', self._make_grpc_not_found())