diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py index 4dc6ceec6a80..75f430b09421 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -75,9 +75,12 @@ def messages(self): def size(self): """Return the total size of all of the messages currently in the batch. + The size includes any overhead of the actual ``PublishRequest`` that is + sent to the backend. + Returns: int: The total size of all of the messages currently - in the batch, in bytes. + in the batch (including the request overhead), in bytes. """ raise NotImplementedError diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 726e93166cda..4101bc518b0a 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -29,6 +29,7 @@ _LOGGER = logging.getLogger(__name__) _CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING) +_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest class Batch(base.Batch): @@ -79,13 +80,17 @@ def __init__(self, client, topic, settings, autocommit=True): # in order to avoid race conditions self._futures = [] self._messages = [] - self._size = 0 self._status = base.BatchStatus.ACCEPTING_MESSAGES + # The initial size is not zero, we need to account for the size overhead + # of the PublishRequest message itself. + self._base_request_size = types.PublishRequest(topic=topic).ByteSize() + self._size = self._base_request_size + # If max latency is specified, start a thread to monitor the batch and # commit when the max latency is reached. self._thread = None - if autocommit and self._settings.max_latency < float("inf"): + if autocommit and self.settings.max_latency < float("inf"): self._thread = threading.Thread( name="Thread-MonitorBatchPublisher", target=self.monitor ) @@ -124,9 +129,12 @@ def settings(self): def size(self): """Return the total size of all of the messages currently in the batch. + The size includes any overhead of the actual ``PublishRequest`` that is + sent to the backend. + Returns: int: The total size of all of the messages currently - in the batch, in bytes. + in the batch (including the request overhead), in bytes. """ return self._size @@ -251,14 +259,14 @@ def _commit(self): def monitor(self): """Commit this batch after sufficient time has elapsed. - This simply sleeps for ``self._settings.max_latency`` seconds, + This simply sleeps for ``self.settings.max_latency`` seconds, and then calls commit unless the batch has already been committed. """ # NOTE: This blocks; it is up to the calling code to call it # in a separate thread. # Sleep for however long we should be waiting. - time.sleep(self._settings.max_latency) + time.sleep(self.settings.max_latency) _LOGGER.debug("Monitor is waking up") return self._commit() @@ -281,6 +289,10 @@ def publish(self, message): the :class:`~concurrent.futures.Future` interface or :data:`None`. If :data:`None` is returned, that signals that the batch cannot accept a message. + + Raises: + pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing + the ``message`` would exceed the max size limit on the backend. """ # Coerce the type, just in case. if not isinstance(message, types.PubsubMessage): @@ -292,12 +304,21 @@ def publish(self, message): if not self.will_accept(message): return future - new_size = self._size + message.ByteSize() + size_increase = types.PublishRequest(messages=[message]).ByteSize() + + if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES: + err_msg = ( + "The message being published would produce too large a publish " + "request that would exceed the maximum allowed size on the " + "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES) + ) + raise exceptions.MessageTooLargeError(err_msg) + + new_size = self._size + size_increase new_count = len(self._messages) + 1 - overflow = ( - new_size > self.settings.max_bytes - or new_count >= self._settings.max_messages - ) + + size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) + overflow = new_size > size_limit or new_count >= self.settings.max_messages if not self._messages or not overflow: diff --git a/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py b/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py index adbfaaaa1ee1..be176bac2dba 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py @@ -22,4 +22,8 @@ class PublishError(GoogleAPICallError): pass -__all__ = ("PublishError", "TimeoutError") +class MessageTooLargeError(ValueError): + """Attempt to publish a message that would exceed the server max size limit.""" + + +__all__ = ("MessageTooLargeError", "PublishError", "TimeoutError") diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py index 7f833660f6e2..2d238b42f797 100644 --- a/pubsub/google/cloud/pubsub_v1/types.py +++ b/pubsub/google/cloud/pubsub_v1/types.py @@ -48,7 +48,9 @@ BatchSettings.__doc__ = "The settings for batch publishing the messages." BatchSettings.max_bytes.__doc__ = ( "The maximum total size of the messages to collect before automatically " - "publishing the batch." + "publishing the batch, including any byte size overhead of the publish " + "request itself. The maximum value is bound by the server-side limit of " + "10_000_000 bytes." ) BatchSettings.max_latency.__doc__ = ( "The maximum number of seconds to wait for additional messages before " diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 59e5e3fe83a4..65baaf016407 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -74,24 +74,52 @@ def cleanup(): def test_publish_messages(publisher, topic_path, cleanup): - futures = [] # Make sure the topic gets deleted. cleanup.append((publisher.delete_topic, topic_path)) publisher.create_topic(topic_path) - for index in six.moves.range(500): - futures.append( - publisher.publish( - topic_path, - b"The hail in Wales falls mainly on the snails.", - num=str(index), - ) + + futures = [ + publisher.publish( + topic_path, b"The hail in Wales falls mainly on the snails.", num=str(i) ) + for i in six.moves.range(500) + ] + for future in futures: result = future.result() assert isinstance(result, six.string_types) +def test_publish_large_messages(publisher, topic_path, cleanup): + # Make sure the topic gets deleted. + cleanup.append((publisher.delete_topic, topic_path)) + + # Each message should be smaller than 10**7 bytes (the server side limit for + # PublishRequest), but all messages combined in a PublishRequest should + # slightly exceed that threshold to make sure the publish code handles these + # cases well. + # Mind that the total PublishRequest size must still be smaller than + # 10 * 1024 * 1024 bytes in order to not exceed the max request body size limit. + msg_data = b"x" * (2 * 10 ** 6) + + publisher.batch_settings = types.BatchSettings( + max_bytes=11 * 1000 * 1000, # more than the server limit of 10 ** 7 + max_latency=2.0, # so that autocommit happens after publishing all messages + max_messages=100, + ) + publisher.create_topic(topic_path) + + futures = [publisher.publish(topic_path, msg_data, num=str(i)) for i in range(5)] + + # If the publishing logic correctly split all messages into more than a + # single batch despite a high BatchSettings.max_bytes limit, there should + # be no "InvalidArgument: request_size is too large" error. + for future in futures: + result = future.result(timeout=10) + assert isinstance(result, six.string_types) # the message ID + + def test_subscribe_to_messages( publisher, topic_path, subscriber, subscription_path, cleanup ): diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 60425e748043..f51b314af6df 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -34,13 +34,15 @@ def create_client(): return publisher.Client(credentials=creds) -def create_batch(autocommit=False, **batch_settings): +def create_batch(autocommit=False, topic="topic_name", **batch_settings): """Return a batch object suitable for testing. Args: autocommit (bool): Whether the batch should commit after ``max_latency`` seconds. By default, this is ``False`` for unit testing. + topic (str): The name of the topic the batch should publish + the messages to. batch_settings (dict): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. @@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings): """ client = create_client() settings = types.BatchSettings(**batch_settings) - return Batch(client, "topic_name", settings, autocommit=autocommit) + return Batch(client, topic, settings, autocommit=autocommit) def test_init(): @@ -299,8 +301,8 @@ def test_monitor_already_committed(): assert batch._status == status -def test_publish(): - batch = create_batch() +def test_publish_updating_batch_size(): + batch = create_batch(topic="topic_foo") messages = ( types.PubsubMessage(data=b"foobarbaz"), types.PubsubMessage(data=b"spameggs"), @@ -314,22 +316,27 @@ def test_publish(): assert len(batch.messages) == 3 assert batch._futures == futures - # The size should have been incremented by the sum of the size of the - # messages. - expected_size = sum([message_pb.ByteSize() for message_pb in messages]) - assert batch.size == expected_size + # The size should have been incremented by the sum of the size + # contributions of each message to the PublishRequest. + base_request_size = types.PublishRequest(topic="topic_foo").ByteSize() + expected_request_size = base_request_size + sum( + types.PublishRequest(messages=[msg]).ByteSize() for msg in messages + ) + + assert batch.size == expected_request_size assert batch.size > 0 # I do not always trust protobuf. def test_publish_not_will_accept(): - batch = create_batch(max_messages=0) + batch = create_batch(topic="topic_foo", max_messages=0) + base_request_size = types.PublishRequest(topic="topic_foo").ByteSize() # Publish the message. message = types.PubsubMessage(data=b"foobarbaz") future = batch.publish(message) assert future is None - assert batch.size == 0 + assert batch.size == base_request_size assert batch.messages == [] assert batch._futures == [] @@ -361,6 +368,47 @@ def test_publish_exceed_max_messages(): assert batch._futures == futures +@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) +def test_publish_single_message_size_exceeds_server_size_limit(): + batch = create_batch( + topic="topic_foo", + max_messages=1000, + max_bytes=1000 * 1000, # way larger than (mocked) server side limit + ) + + big_message = types.PubsubMessage(data=b"x" * 984) + + request_size = types.PublishRequest( + topic="topic_foo", messages=[big_message] + ).ByteSize() + assert request_size == 1001 # sanity check, just above the (mocked) server limit + + with pytest.raises(exceptions.MessageTooLargeError): + batch.publish(big_message) + + +@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) +def test_publish_total_messages_size_exceeds_server_size_limit(): + batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500) + + messages = ( + types.PubsubMessage(data=b"x" * 500), + types.PubsubMessage(data=b"x" * 600), + ) + + # Sanity check - request size is still below BatchSettings.max_bytes, + # but it exceeds the server-side size limit. + request_size = types.PublishRequest(topic="topic_foo", messages=messages).ByteSize() + assert 1000 < request_size < 1500 + + with mock.patch.object(batch, "commit") as fake_commit: + batch.publish(messages[0]) + batch.publish(messages[1]) + + # The server side limit should kick in and cause a commit. + fake_commit.assert_called_once() + + def test_publish_dict(): batch = create_batch() future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}})