diff --git a/pubsub/google/cloud/pubsub_v1/futures.py b/pubsub/google/cloud/pubsub_v1/futures.py index 0d7ba7f9bf52..ba861e40c653 100644 --- a/pubsub/google/cloud/pubsub_v1/futures.py +++ b/pubsub/google/cloud/pubsub_v1/futures.py @@ -74,9 +74,7 @@ def running(self): bool: ``True`` if this method has not yet completed, or ``False`` if it has completed. """ - if self.done(): - return False - return True + return not self.done() def done(self): """Return True the future is done, False otherwise. diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 117ee12b8463..726e93166cda 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -74,6 +74,9 @@ def __init__(self, client, topic, settings, autocommit=True): self._state_lock = threading.Lock() # These members are all communicated between threads; ensure that # any writes to them use the "state lock" to remain atomic. + # _futures list should remain unchanged after batch + # status changed from ACCEPTING_MESSAGES to any other + # in order to avoid race conditions self._futures = [] self._messages = [] self._size = 0 diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 05a4161e889a..60a03bb652ab 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -134,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs): # messages. One batch exists for each topic. self._batch_lock = self._batch_class.make_lock() self._batches = {} + self._is_stopped = False @classmethod def from_service_account_file(cls, filename, batch_settings=(), **kwargs): @@ -187,20 +188,19 @@ def _batch(self, topic, create=False, autocommit=True): """ # If there is no matching batch yet, then potentially create one # and place it on the batches dictionary. - with self._batch_lock: - if not create: - batch = self._batches.get(topic) - if batch is None: - create = True - - if create: - batch = self._batch_class( - autocommit=autocommit, - client=self, - settings=self.batch_settings, - topic=topic, - ) - self._batches[topic] = batch + if not create: + batch = self._batches.get(topic) + if batch is None: + create = True + + if create: + batch = self._batch_class( + autocommit=autocommit, + client=self, + settings=self.batch_settings, + topic=topic, + ) + self._batches[topic] = batch return batch @@ -242,12 +242,17 @@ def publish(self, topic, data, **attrs): instance that conforms to Python Standard library's :class:`~concurrent.futures.Future` interface (but not an instance of that class). + + Raises: + RuntimeError: + If called after publisher has been stopped + by a `stop()` method call. """ # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. if not isinstance(data, six.binary_type): raise TypeError( - "Data being published to Pub/Sub must be sent " "as a bytestring." + "Data being published to Pub/Sub must be sent as a bytestring." ) # Coerce all attributes to text strings. @@ -266,11 +271,38 @@ def publish(self, topic, data, **attrs): message = types.PubsubMessage(data=data, attributes=attrs) # Delegate the publishing to the batch. - batch = self._batch(topic) - future = None - while future is None: - future = batch.publish(message) - if future is None: - batch = self._batch(topic, create=True) + with self._batch_lock: + if self._is_stopped: + raise RuntimeError("Cannot publish on a stopped publisher.") + + batch = self._batch(topic) + future = None + while future is None: + future = batch.publish(message) + if future is None: + batch = self._batch(topic, create=True) return future + + def stop(self): + """Immediately publish all outstanding messages. + + Asynchronously sends all outstanding messages and + prevents future calls to `publish()`. Method should + be invoked prior to deleting this `Client()` object + in order to ensure that no pending messages are lost. + + .. note:: + + This method is non-blocking. Use `Future()` objects + returned by `publish()` to make sure all publish + requests completed, either in success or error. + """ + with self._batch_lock: + if self._is_stopped: + raise RuntimeError("Cannot stop a publisher already stopped.") + + self._is_stopped = True + + for batch in self._batches.values(): + batch.commit() diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 05e4c8c67209..6519b2b23149 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -201,6 +201,36 @@ def test_publish_attrs_type_error(): client.publish(topic, b"foo", answer=42) +def test_stop(): + creds = mock.Mock(spec=credentials.Credentials) + client = publisher.Client(credentials=creds) + + batch = client._batch("topic1", autocommit=False) + batch2 = client._batch("topic2", autocommit=False) + + pubsub_msg = types.PubsubMessage(data=b"msg") + + patch = mock.patch.object(batch, "commit") + patch2 = mock.patch.object(batch2, "commit") + + with patch as commit_mock, patch2 as commit_mock2: + batch.publish(pubsub_msg) + batch2.publish(pubsub_msg) + + client.stop() + + # check if commit() called + commit_mock.assert_called() + commit_mock2.assert_called() + + # check that closed publisher doesn't accept new messages + with pytest.raises(RuntimeError): + client.publish("topic1", b"msg2") + + with pytest.raises(RuntimeError): + client.stop() + + def test_gapic_instance_method(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds)