Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub batch autocommitting. #2966

Merged
merged 4 commits into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions pubsub/google/cloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Define API Topics."""

import time

from google.cloud._helpers import _datetime_to_rfc3339
from google.cloud._helpers import _NOW
from google.cloud.exceptions import NotFound
Expand Down Expand Up @@ -408,15 +410,40 @@ class Batch(object):
:type topic: :class:`google.cloud.pubsub.topic.Topic`
:param topic: the topic being published

:type client: :class:`google.cloud.pubsub.client.Client`
:param client: The client to use.
:type client: :class:`google.cloud.pubsub.client.Client`

:param max_interval: The maximum interval, in seconds, before the batch
will automatically commit. Note that this does not
run a background loop; it just checks when each
message is published. Therefore, this is intended
for situations where messages are published at
reasonably regular intervals. Defaults to infinity
(off).
:type max_interval: float

:param max_messages: The maximum number of messages to hold in the batch
before automatically commiting. Defaults to infinity
(off).
:type max_messages: float
"""
def __init__(self, topic, client):
INFINITY = float('inf')

This comment was marked as spam.


def __init__(self, topic, client, max_interval=INFINITY,

This comment was marked as spam.

This comment was marked as spam.

max_messages=INFINITY):
self.topic = topic
self.messages = []
self.message_ids = []
self.client = client

# Set the autocommit rules. If the interval or number of messages
# is exceeded, then the .publish() method will imply a commit.
self._max_interval = max_interval
self._max_messages = max_messages

# Set the initial starting timestamp (used against the interval).
self._start_timestamp = float(time.time())

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


def __enter__(self):
return self

Expand All @@ -435,12 +462,27 @@ def publish(self, message, **attrs):

:type attrs: dict (string -> string)
:param attrs: key-value pairs to send as message attributes

:rtype: None

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

:returns: None
"""
self.topic._timestamp_message(attrs)
self.messages.append(
{'data': message,
'attributes': attrs})

# If too much time has elapsed since the first message
# was added, autocommit.
if self._max_interval < self.INFINITY:

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

if float(time.time()) - self._start_timestamp > self._max_interval:
self._start_timestamp = float(time.time())

This comment was marked as spam.

This comment was marked as spam.

return self.commit()

This comment was marked as spam.

This comment was marked as spam.


# If the number of messages on the list is greater than the
# maximum allowed, autocommit (with the batch's client).
if len(self.messages) >= self._max_messages:

This comment was marked as spam.

return self.commit()

This comment was marked as spam.

This comment was marked as spam.


def commit(self, client=None):
"""Send saved messages as a single API call.

Expand Down
71 changes: 71 additions & 0 deletions pubsub/unit_tests/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,77 @@ def test_context_mgr_failure(self):
self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2])
self.assertEqual(getattr(api, '_topic_published', self), self)

def test_message_count_autocommit(self):
"""Establish that if the batch is assigned to take a maximum
number of messages, that it commits when it reaches that maximum.
"""

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

client = _Client(project='PROJECT')
topic = _Topic(name='TOPIC')

# Track commits, but do not perform them.
Batch = self._get_target_class()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

with mock.patch.object(Batch, 'commit') as commit:
with self._make_one(topic, client=client, max_messages=5) as batch:
self.assertIsInstance(batch, self._get_target_class())

This comment was marked as spam.

This comment was marked as spam.


# Publish four messages and establish that the batch does
# not commit.
for i in range(0, 4):
batch.publish({
'attributes': {},
'data': 'Batch message %d.' % i,

This comment was marked as spam.

})
commit.assert_not_called()

# Publish a fifth message and observe the commit.
batch.publish({
'attributes': {},
'data': 'The final call to trigger a commit!',
})
commit.assert_called_once_with()

# There should be a second commit after the context manager
# exits.
self.assertEqual(commit.call_count, 2)

@mock.patch('time.time')
def test_message_time_autocommit(self, mock_time):
"""Establish that if the batch is sufficiently old, that it commits
the next time it receives a publish.
"""
client = _Client(project='PROJECT')
topic = _Topic(name='TOPIC')

# Track commits, but do not perform them.
Batch = self._get_target_class()
with mock.patch.object(Batch, 'commit') as commit:
mock_time.return_value = 0.0

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

with self._make_one(topic, client=client, max_interval=5) as batch:
self.assertIsInstance(batch, self._get_target_class())

# Publish some messages and establish that the batch does
# not commit.
for i in range(0, 10):
batch.publish({
'attributes': {},
'data': 'Batch message %d.' % i,
})
commit.assert_not_called()

# Move time ahead so that this batch is too old.
mock_time.return_value = 10.0

# Publish another message and observe the commit.
batch.publish({
'attributes': {},
'data': 'The final call to trigger a commit!',
})
commit.assert_called_once_with()

# There should be a second commit after the context manager
# exits.
self.assertEqual(commit.call_count, 2)


class _FauxPublisherAPI(object):
_api_called = 0
Expand Down