diff --git a/pubsub/google/cloud/pubsub/topic.py b/pubsub/google/cloud/pubsub/topic.py index 12c6c3a68450..0dd5b4fda038 100644 --- a/pubsub/google/cloud/pubsub/topic.py +++ b/pubsub/google/cloud/pubsub/topic.py @@ -14,6 +14,8 @@ """Define API Topics.""" +import time + from google.cloud._helpers import _datetime_to_rfc3339 from google.cloud._helpers import _NOW from google.cloud.exceptions import NotFound @@ -408,15 +410,40 @@ class Batch(object): :type topic: :class:`google.cloud.pubsub.topic.Topic` :param topic: the topic being published - :type client: :class:`google.cloud.pubsub.client.Client` :param client: The client to use. + :type client: :class:`google.cloud.pubsub.client.Client` + + :param max_interval: The maximum interval, in seconds, before the batch + will automatically commit. Note that this does not + run a background loop; it just checks when each + message is published. Therefore, this is intended + for situations where messages are published at + reasonably regular intervals. Defaults to infinity + (off). + :type max_interval: float + + :param max_messages: The maximum number of messages to hold in the batch + before automatically commiting. Defaults to infinity + (off). + :type max_messages: float """ - def __init__(self, topic, client): + _INFINITY = float('inf') + + def __init__(self, topic, client, max_interval=_INFINITY, + max_messages=_INFINITY): self.topic = topic self.messages = [] self.message_ids = [] self.client = client + # Set the autocommit rules. If the interval or number of messages + # is exceeded, then the .publish() method will imply a commit. + self._max_interval = max_interval + self._max_messages = max_messages + + # Set the initial starting timestamp (used against the interval). + self._start_timestamp = time.time() + def __enter__(self): return self @@ -441,6 +468,20 @@ def publish(self, message, **attrs): {'data': message, 'attributes': attrs}) + # If too much time has elapsed since the first message + # was added, autocommit. + now = time.time() + if now - self._start_timestamp > self._max_interval: + self.commit() + self._start_timestamp = now + return + + # If the number of messages on the list is greater than the + # maximum allowed, autocommit (with the batch's client). + if len(self.messages) >= self._max_messages: + self.commit() + return + def commit(self, client=None): """Send saved messages as a single API call. diff --git a/pubsub/unit_tests/test_topic.py b/pubsub/unit_tests/test_topic.py index 5009e53a0a89..f264b4dcd036 100644 --- a/pubsub/unit_tests/test_topic.py +++ b/pubsub/unit_tests/test_topic.py @@ -779,6 +779,77 @@ def test_context_mgr_failure(self): self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2]) self.assertEqual(getattr(api, '_topic_published', self), self) + def test_message_count_autocommit(self): + """Establish that if the batch is assigned to take a maximum + number of messages, that it commits when it reaches that maximum. + """ + client = _Client(project='PROJECT') + topic = _Topic(name='TOPIC') + + # Track commits, but do not perform them. + Batch = self._get_target_class() + with mock.patch.object(Batch, 'commit') as commit: + with self._make_one(topic, client=client, max_messages=5) as batch: + self.assertIsInstance(batch, Batch) + + # Publish four messages and establish that the batch does + # not commit. + for i in range(0, 4): + batch.publish({ + 'attributes': {}, + 'data': 'Batch message %d.' % (i,), + }) + commit.assert_not_called() + + # Publish a fifth message and observe the commit. + batch.publish({ + 'attributes': {}, + 'data': 'The final call to trigger a commit!', + }) + commit.assert_called_once_with() + + # There should be a second commit after the context manager + # exits. + self.assertEqual(commit.call_count, 2) + + @mock.patch('time.time') + def test_message_time_autocommit(self, mock_time): + """Establish that if the batch is sufficiently old, that it commits + the next time it receives a publish. + """ + client = _Client(project='PROJECT') + topic = _Topic(name='TOPIC') + + # Track commits, but do not perform them. + Batch = self._get_target_class() + with mock.patch.object(Batch, 'commit') as commit: + mock_time.return_value = 0.0 + with self._make_one(topic, client=client, max_interval=5) as batch: + self.assertIsInstance(batch, Batch) + + # Publish some messages and establish that the batch does + # not commit. + for i in range(0, 10): + batch.publish({ + 'attributes': {}, + 'data': 'Batch message %d.' % (i,), + }) + commit.assert_not_called() + + # Move time ahead so that this batch is too old. + mock_time.return_value = 10.0 + + # Publish another message and observe the commit. + batch.publish({ + 'attributes': {}, + 'data': 'The final call to trigger a commit!', + }) + commit.assert_called_once_with() + + # There should be a second commit after the context manager + # exits. + self.assertEqual(commit.call_count, 2) + class _FauxPublisherAPI(object): _api_called = 0