-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pubsub batch autocommitting. #2966
Pubsub batch autocommitting. #2966
Conversation
This PR adds some functionality to the Batch object: * The ability to specify `max_messages` and have the batch automatically call `commit` when the number of messages gets that high. * The ability to specify `max_interval` and have the batch automatically commit when a publish occurs and the batch is at least as old as the specified interval. This is one of two changes requested by the PubSub team.
def __init__(self, topic, client): | ||
INFINITY = float('inf') | ||
|
||
def __init__(self, topic, client, max_interval=INFINITY, |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LG, ping me once addressed / we discuss?
""" | ||
def __init__(self, topic, client): | ||
INFINITY = float('inf') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self._max_messages = max_messages | ||
|
||
# Set the initial starting timestamp (used against the interval). | ||
self._start_timestamp = float(time.time()) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -435,12 +462,27 @@ def publish(self, message, **attrs): | |||
|
|||
:type attrs: dict (string -> string) | |||
:param attrs: key-value pairs to send as message attributes | |||
|
|||
:rtype: None |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if self._max_interval < self.INFINITY: | ||
if float(time.time()) - self._start_timestamp > self._max_interval: | ||
self._start_timestamp = float(time.time()) | ||
return self.commit() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# If the number of messages on the list is greater than the | ||
# maximum allowed, autocommit (with the batch's client). | ||
if len(self.messages) >= self._max_messages: | ||
return self.commit() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
def test_message_count_autocommit(self): | ||
"""Establish that if the batch is assigned to take a maximum | ||
number of messages, that it commits when it reaches that maximum. | ||
""" |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
topic = _Topic(name='TOPIC') | ||
|
||
# Track commits, but do not perform them. | ||
Batch = self._get_target_class() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
for i in range(0, 4): | ||
batch.publish({ | ||
'attributes': {}, | ||
'data': 'Batch message %d.' % i, |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Batch = self._get_target_class() | ||
with mock.patch.object(Batch, 'commit') as commit: | ||
with self._make_one(topic, client=client, max_messages=5) as batch: | ||
self.assertIsInstance(batch, self._get_target_class()) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# Track commits, but do not perform them. | ||
Batch = self._get_target_class() | ||
with mock.patch.object(Batch, 'commit') as commit: | ||
mock_time.return_value = 0.0 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
self.topic._timestamp_message(attrs) | ||
self.messages.append( | ||
{'data': message, | ||
'attributes': attrs}) | ||
|
||
# If too much time has elapsed since the first message | ||
# was added, autocommit. | ||
if self._max_interval < self.INFINITY: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
# If the number of messages on the list is greater than the | ||
# maximum allowed, autocommit (with the batch's client). | ||
if len(self.messages) >= self._max_messages: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# maximum allowed, autocommit (with the batch's client). | ||
if len(self.messages) >= self._max_messages: | ||
self.commit() | ||
return |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
topic = _Topic(name='TOPIC') | ||
|
||
# Track commits, but do not perform them. | ||
Batch = self._get_target_class() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# Track commits, but do not perform them. | ||
Batch = self._get_target_class() | ||
with mock.patch.object(Batch, 'commit') as commit: | ||
mock_time.return_value = 0.0 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once CI goes green
* Pubsub batch autocommitting. This PR adds some functionality to the Batch object: * The ability to specify `max_messages` and have the batch automatically call `commit` when the number of messages gets that high. * The ability to specify `max_interval` and have the batch automatically commit when a publish occurs and the batch is at least as old as the specified interval. This is one of two changes requested by the PubSub team. * Addressing comments from @dhermes. * Remove unneeded -lt check @dhermes. * Make INFINITY have a leading underscore. @dhermes
This PR adds some functionality to the Batch object:
max_messages
and have the batchautomatically call
commit
when the number of messagesgets that high.
max_interval
and have the batchautomatically commit when a publish occurs and the batch
is at least as old as the specified interval.
This is one of two changes requested by the PubSub team.