Skip to content

Commit

Permalink
feat(pubsub): add stop method (#9365)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gurov Ilya authored and pradn committed Nov 7, 2019
1 parent 879d9a1 commit 1e7c0a6
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 24 deletions.
4 changes: 1 addition & 3 deletions pubsub/google/cloud/pubsub_v1/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def running(self):
bool: ``True`` if this method has not yet completed, or
``False`` if it has completed.
"""
if self.done():
return False
return True
return not self.done()

def done(self):
"""Return True the future is done, False otherwise.
Expand Down
3 changes: 3 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def __init__(self, client, topic, settings, autocommit=True):
self._state_lock = threading.Lock()
# These members are all communicated between threads; ensure that
# any writes to them use the "state lock" to remain atomic.
# _futures list should remain unchanged after batch
# status changed from ACCEPTING_MESSAGES to any other
# in order to avoid race conditions
self._futures = []
self._messages = []
self._size = 0
Expand Down
74 changes: 53 additions & 21 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs):
# messages. One batch exists for each topic.
self._batch_lock = self._batch_class.make_lock()
self._batches = {}
self._is_stopped = False

@classmethod
def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
Expand Down Expand Up @@ -187,20 +188,19 @@ def _batch(self, topic, create=False, autocommit=True):
"""
# If there is no matching batch yet, then potentially create one
# and place it on the batches dictionary.
with self._batch_lock:
if not create:
batch = self._batches.get(topic)
if batch is None:
create = True

if create:
batch = self._batch_class(
autocommit=autocommit,
client=self,
settings=self.batch_settings,
topic=topic,
)
self._batches[topic] = batch
if not create:
batch = self._batches.get(topic)
if batch is None:
create = True

if create:
batch = self._batch_class(
autocommit=autocommit,
client=self,
settings=self.batch_settings,
topic=topic,
)
self._batches[topic] = batch

return batch

Expand Down Expand Up @@ -242,12 +242,17 @@ def publish(self, topic, data, **attrs):
instance that conforms to Python Standard library's
:class:`~concurrent.futures.Future` interface (but not an
instance of that class).
Raises:
RuntimeError:
If called after publisher has been stopped
by a `stop()` method call.
"""
# Sanity check: Is the data being sent as a bytestring?
# If it is literally anything else, complain loudly about it.
if not isinstance(data, six.binary_type):
raise TypeError(
"Data being published to Pub/Sub must be sent " "as a bytestring."
"Data being published to Pub/Sub must be sent as a bytestring."
)

# Coerce all attributes to text strings.
Expand All @@ -266,11 +271,38 @@ def publish(self, topic, data, **attrs):
message = types.PubsubMessage(data=data, attributes=attrs)

# Delegate the publishing to the batch.
batch = self._batch(topic)
future = None
while future is None:
future = batch.publish(message)
if future is None:
batch = self._batch(topic, create=True)
with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")

batch = self._batch(topic)
future = None
while future is None:
future = batch.publish(message)
if future is None:
batch = self._batch(topic, create=True)

return future

def stop(self):
"""Immediately publish all outstanding messages.
Asynchronously sends all outstanding messages and
prevents future calls to `publish()`. Method should
be invoked prior to deleting this `Client()` object
in order to ensure that no pending messages are lost.
.. note::
This method is non-blocking. Use `Future()` objects
returned by `publish()` to make sure all publish
requests completed, either in success or error.
"""
with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot stop a publisher already stopped.")

self._is_stopped = True

for batch in self._batches.values():
batch.commit()
30 changes: 30 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ def test_publish_attrs_type_error():
client.publish(topic, b"foo", answer=42)


def test_stop():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)

batch = client._batch("topic1", autocommit=False)
batch2 = client._batch("topic2", autocommit=False)

pubsub_msg = types.PubsubMessage(data=b"msg")

patch = mock.patch.object(batch, "commit")
patch2 = mock.patch.object(batch2, "commit")

with patch as commit_mock, patch2 as commit_mock2:
batch.publish(pubsub_msg)
batch2.publish(pubsub_msg)

client.stop()

# check if commit() called
commit_mock.assert_called()
commit_mock2.assert_called()

# check that closed publisher doesn't accept new messages
with pytest.raises(RuntimeError):
client.publish("topic1", b"msg2")

with pytest.raises(RuntimeError):
client.stop()


def test_gapic_instance_method():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
Expand Down

0 comments on commit 1e7c0a6

Please sign in to comment.