Skip to content

Commit

Permalink
fix(pubsub): include request overhead when computing publish batch si…
Browse files Browse the repository at this point in the history
…ze overflow (#9911)

* Clarify the description of BatchSettings.max_bytes

* Include overhead in batch overflow calculation

The maximum allowed size for a PublishRequest on the backend is lower
than a mere sum of the byte sizes of individual messages.

This commit adjusts the batch size overflow calculation to account for
this overhead. It also caps the effective maximum BatchSetting.max_size
value to 10_000_000 bytes (the limit on the backend).

(credit also to GitHub @relud for outlining the main idea first in the
issue description)

* Access settings inside Batch in a consistent way.

* Cleanup and refactor a few code snippets

* Raise more specific error if message too large
  • Loading branch information
plamut authored Dec 5, 2019
1 parent 6838a4f commit 0699ba6
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 31 deletions.
5 changes: 4 additions & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ def messages(self):
def size(self):
"""Return the total size of all of the messages currently in the batch.
The size includes any overhead of the actual ``PublishRequest`` that is
sent to the backend.
Returns:
int: The total size of all of the messages currently
in the batch, in bytes.
in the batch (including the request overhead), in bytes.
"""
raise NotImplementedError

Expand Down
41 changes: 31 additions & 10 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

_LOGGER = logging.getLogger(__name__)
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest


class Batch(base.Batch):
Expand Down Expand Up @@ -79,13 +80,17 @@ def __init__(self, client, topic, settings, autocommit=True):
# in order to avoid race conditions
self._futures = []
self._messages = []
self._size = 0
self._status = base.BatchStatus.ACCEPTING_MESSAGES

# The initial size is not zero, we need to account for the size overhead
# of the PublishRequest message itself.
self._base_request_size = types.PublishRequest(topic=topic).ByteSize()
self._size = self._base_request_size

# If max latency is specified, start a thread to monitor the batch and
# commit when the max latency is reached.
self._thread = None
if autocommit and self._settings.max_latency < float("inf"):
if autocommit and self.settings.max_latency < float("inf"):
self._thread = threading.Thread(
name="Thread-MonitorBatchPublisher", target=self.monitor
)
Expand Down Expand Up @@ -124,9 +129,12 @@ def settings(self):
def size(self):
"""Return the total size of all of the messages currently in the batch.
The size includes any overhead of the actual ``PublishRequest`` that is
sent to the backend.
Returns:
int: The total size of all of the messages currently
in the batch, in bytes.
in the batch (including the request overhead), in bytes.
"""
return self._size

Expand Down Expand Up @@ -251,14 +259,14 @@ def _commit(self):
def monitor(self):
"""Commit this batch after sufficient time has elapsed.
This simply sleeps for ``self._settings.max_latency`` seconds,
This simply sleeps for ``self.settings.max_latency`` seconds,
and then calls commit unless the batch has already been committed.
"""
# NOTE: This blocks; it is up to the calling code to call it
# in a separate thread.

# Sleep for however long we should be waiting.
time.sleep(self._settings.max_latency)
time.sleep(self.settings.max_latency)

_LOGGER.debug("Monitor is waking up")
return self._commit()
Expand All @@ -281,6 +289,10 @@ def publish(self, message):
the :class:`~concurrent.futures.Future` interface or :data:`None`.
If :data:`None` is returned, that signals that the batch cannot
accept a message.
Raises:
pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
the ``message`` would exceed the max size limit on the backend.
"""
# Coerce the type, just in case.
if not isinstance(message, types.PubsubMessage):
Expand All @@ -292,12 +304,21 @@ def publish(self, message):
if not self.will_accept(message):
return future

new_size = self._size + message.ByteSize()
size_increase = types.PublishRequest(messages=[message]).ByteSize()

if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
err_msg = (
"The message being published would produce too large a publish "
"request that would exceed the maximum allowed size on the "
"backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES)
)
raise exceptions.MessageTooLargeError(err_msg)

new_size = self._size + size_increase
new_count = len(self._messages) + 1
overflow = (
new_size > self.settings.max_bytes
or new_count >= self._settings.max_messages
)

size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
overflow = new_size > size_limit or new_count >= self.settings.max_messages

if not self._messages or not overflow:

Expand Down
6 changes: 5 additions & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ class PublishError(GoogleAPICallError):
pass


__all__ = ("PublishError", "TimeoutError")
class MessageTooLargeError(ValueError):
"""Attempt to publish a message that would exceed the server max size limit."""


__all__ = ("MessageTooLargeError", "PublishError", "TimeoutError")
4 changes: 3 additions & 1 deletion pubsub/google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
BatchSettings.__doc__ = "The settings for batch publishing the messages."
BatchSettings.max_bytes.__doc__ = (
"The maximum total size of the messages to collect before automatically "
"publishing the batch."
"publishing the batch, including any byte size overhead of the publish "
"request itself. The maximum value is bound by the server-side limit of "
"10_000_000 bytes."
)
BatchSettings.max_latency.__doc__ = (
"The maximum number of seconds to wait for additional messages before "
Expand Down
44 changes: 36 additions & 8 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,52 @@ def cleanup():


def test_publish_messages(publisher, topic_path, cleanup):
futures = []
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, topic_path))

publisher.create_topic(topic_path)
for index in six.moves.range(500):
futures.append(
publisher.publish(
topic_path,
b"The hail in Wales falls mainly on the snails.",
num=str(index),
)

futures = [
publisher.publish(
topic_path, b"The hail in Wales falls mainly on the snails.", num=str(i)
)
for i in six.moves.range(500)
]

for future in futures:
result = future.result()
assert isinstance(result, six.string_types)


def test_publish_large_messages(publisher, topic_path, cleanup):
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, topic_path))

# Each message should be smaller than 10**7 bytes (the server side limit for
# PublishRequest), but all messages combined in a PublishRequest should
# slightly exceed that threshold to make sure the publish code handles these
# cases well.
# Mind that the total PublishRequest size must still be smaller than
# 10 * 1024 * 1024 bytes in order to not exceed the max request body size limit.
msg_data = b"x" * (2 * 10 ** 6)

publisher.batch_settings = types.BatchSettings(
max_bytes=11 * 1000 * 1000, # more than the server limit of 10 ** 7
max_latency=2.0, # so that autocommit happens after publishing all messages
max_messages=100,
)
publisher.create_topic(topic_path)

futures = [publisher.publish(topic_path, msg_data, num=str(i)) for i in range(5)]

# If the publishing logic correctly split all messages into more than a
# single batch despite a high BatchSettings.max_bytes limit, there should
# be no "InvalidArgument: request_size is too large" error.
for future in futures:
result = future.result(timeout=10)
assert isinstance(result, six.string_types) # the message ID


def test_subscribe_to_messages(
publisher, topic_path, subscriber, subscription_path, cleanup
):
Expand Down
68 changes: 58 additions & 10 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def create_client():
return publisher.Client(credentials=creds)


def create_batch(autocommit=False, **batch_settings):
def create_batch(autocommit=False, topic="topic_name", **batch_settings):
"""Return a batch object suitable for testing.
Args:
autocommit (bool): Whether the batch should commit after
``max_latency`` seconds. By default, this is ``False``
for unit testing.
topic (str): The name of the topic the batch should publish
the messages to.
batch_settings (dict): Arguments passed on to the
:class:``~.pubsub_v1.types.BatchSettings`` constructor.
Expand All @@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings):
"""
client = create_client()
settings = types.BatchSettings(**batch_settings)
return Batch(client, "topic_name", settings, autocommit=autocommit)
return Batch(client, topic, settings, autocommit=autocommit)


def test_init():
Expand Down Expand Up @@ -299,8 +301,8 @@ def test_monitor_already_committed():
assert batch._status == status


def test_publish():
batch = create_batch()
def test_publish_updating_batch_size():
batch = create_batch(topic="topic_foo")
messages = (
types.PubsubMessage(data=b"foobarbaz"),
types.PubsubMessage(data=b"spameggs"),
Expand All @@ -314,22 +316,27 @@ def test_publish():
assert len(batch.messages) == 3
assert batch._futures == futures

# The size should have been incremented by the sum of the size of the
# messages.
expected_size = sum([message_pb.ByteSize() for message_pb in messages])
assert batch.size == expected_size
# The size should have been incremented by the sum of the size
# contributions of each message to the PublishRequest.
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
expected_request_size = base_request_size + sum(
types.PublishRequest(messages=[msg]).ByteSize() for msg in messages
)

assert batch.size == expected_request_size
assert batch.size > 0 # I do not always trust protobuf.


def test_publish_not_will_accept():
batch = create_batch(max_messages=0)
batch = create_batch(topic="topic_foo", max_messages=0)
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()

# Publish the message.
message = types.PubsubMessage(data=b"foobarbaz")
future = batch.publish(message)

assert future is None
assert batch.size == 0
assert batch.size == base_request_size
assert batch.messages == []
assert batch._futures == []

Expand Down Expand Up @@ -361,6 +368,47 @@ def test_publish_exceed_max_messages():
assert batch._futures == futures


@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
def test_publish_single_message_size_exceeds_server_size_limit():
batch = create_batch(
topic="topic_foo",
max_messages=1000,
max_bytes=1000 * 1000, # way larger than (mocked) server side limit
)

big_message = types.PubsubMessage(data=b"x" * 984)

request_size = types.PublishRequest(
topic="topic_foo", messages=[big_message]
).ByteSize()
assert request_size == 1001 # sanity check, just above the (mocked) server limit

with pytest.raises(exceptions.MessageTooLargeError):
batch.publish(big_message)


@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
def test_publish_total_messages_size_exceeds_server_size_limit():
batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500)

messages = (
types.PubsubMessage(data=b"x" * 500),
types.PubsubMessage(data=b"x" * 600),
)

# Sanity check - request size is still below BatchSettings.max_bytes,
# but it exceeds the server-side size limit.
request_size = types.PublishRequest(topic="topic_foo", messages=messages).ByteSize()
assert 1000 < request_size < 1500

with mock.patch.object(batch, "commit") as fake_commit:
batch.publish(messages[0])
batch.publish(messages[1])

# The server side limit should kick in and cause a commit.
fake_commit.assert_called_once()


def test_publish_dict():
batch = create_batch()
future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}})
Expand Down

0 comments on commit 0699ba6

Please sign in to comment.