From c29d7f891c776e1a3fcb1cbfc7f549ca0772f38e Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 20 Aug 2020 20:08:04 +0200 Subject: [PATCH] Optimize creating and accesing pubsub messages Profiling shows that the speed of creating a new pubsub message and the speed of accessing the message's attributes significantly affects the throughput of publisher and subscriber. This commit makes everything faster by circumventing the wrapper class around the raw protobuf pubsub messages where possible. --- .../pubsub_v1/publisher/_batch/thread.py | 8 +++++- google/cloud/pubsub_v1/publisher/client.py | 9 ++++-- .../_protocol/streaming_pull_manager.py | 10 +++++-- google/cloud/pubsub_v1/subscriber/message.py | 28 +++++++++++++++---- .../unit/pubsub_v1/subscriber/test_message.py | 21 ++++++++------ .../subscriber/test_messages_on_hold.py | 2 +- 6 files changed, 56 insertions(+), 22 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index e7f1365c6..fc4e6ba6d 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -32,6 +32,8 @@ _CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING) _SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest +_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb() + class Batch(base.Batch): """A batch of messages. @@ -337,7 +339,11 @@ def publish(self, message): # Coerce the type, just in case. if not isinstance(message, gapic_types.PubsubMessage): - message = gapic_types.PubsubMessage(**message) + # For performance reasons, the message should be constructed by directly + # using the raw protobuf class, and only then wrapping it into the + # higher-level PubsubMessage class. + vanilla_pb = _raw_proto_pubbsub_message(**message) + message = gapic_types.PubsubMessage.wrap(vanilla_pb) future = None diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 16a40c166..ea371190c 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -52,6 +52,8 @@ "from_service_account_json", ) +_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb() + def _set_nested_value(container, value, keys): current = container @@ -346,10 +348,13 @@ def publish( "be sent as text strings." ) - # Create the Pub/Sub message object. - message = gapic_types.PubsubMessage( + # Create the Pub/Sub message object. For performance reasons, the message + # should be constructed by directly using the raw protobuf class, and only + # then wrapping it into the higher-level PubsubMessage class. + vanilla_pb = _raw_proto_pubbsub_message( data=data, ordering_key=ordering_key, attributes=attrs ) + message = gapic_types.PubsubMessage.wrap(vanilla_pb) # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 9725d22dd..7476e887b 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -602,9 +602,13 @@ def _on_response(self, response): ) return + # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying + # protobuf message to significantly gain on attribute access performance. + received_messages = response._pb.received_messages + _LOGGER.debug( "Processing %s received message(s), currently on hold %s (bytes %s).", - len(response.received_messages), + len(received_messages), self._messages_on_hold.size, self._on_hold_bytes, ) @@ -614,12 +618,12 @@ def _on_response(self, response): # received them. items = [ requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) - for message in response.received_messages + for message in received_messages ] self._dispatcher.modify_ack_deadline(items) with self._pause_resume_lock: - for received_message in response.received_messages: + for received_message in received_messages: message = google.cloud.pubsub_v1.subscriber.message.Message( received_message.message, received_message.ack_id, diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index 17749d078..c08e0a605 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -14,8 +14,10 @@ from __future__ import absolute_import +import datetime as dt import json import math +import pytz import time from google.cloud.pubsub_v1.subscriber._protocol import requests @@ -79,7 +81,9 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue): Args: message (~.pubsub_v1.types.PubsubMessage): The message received - from Pub/Sub. + from Pub/Sub. For performance reasons it should be the the raw + protobuf message wrapped by the ``PubsubMessage`` class obtained + through the message's ``.pb()`` method. ack_id (str): The ack_id received from Pub/Sub. delivery_attempt (int): The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy is set on the subscription, @@ -99,6 +103,18 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue): # the default lease deadline. self._received_timestamp = time.time() + # Store the message attributes directly to speed up attribute access, i.e. + # to avoid two lookups if self._message. pattern was used in + # properties. + self._attributes = message.attributes + self._data = message.data + self._publish_time = dt.datetime.fromtimestamp( + message.publish_time.seconds + message.publish_time.nanos / 1e9, + tz=pytz.UTC, + ) + self._ordering_key = message.ordering_key + self._size = message.ByteSize() + def __repr__(self): # Get an abbreviated version of the data. abbv_data = self._message.data @@ -130,7 +146,7 @@ def attributes(self): .ScalarMapContainer: The message's attributes. This is a ``dict``-like object provided by ``google.protobuf``. """ - return self._message.attributes + return self._attributes @property def data(self): @@ -140,7 +156,7 @@ def data(self): bytes: The message data. This is always a bytestring; if you want a text string, call :meth:`bytes.decode`. """ - return self._message.data + return self._data @property def publish_time(self): @@ -149,17 +165,17 @@ def publish_time(self): Returns: datetime: The date and time that the message was published. """ - return self._message.publish_time + return self._publish_time @property def ordering_key(self): """str: the ordering key used to publish the message.""" - return self._message.ordering_key + return self._ordering_key @property def size(self): """Return the size of the underlying message, in bytes.""" - return self._message._pb.ByteSize() + return self._size @property def ack_id(self): diff --git a/tests/unit/pubsub_v1/subscriber/test_message.py b/tests/unit/pubsub_v1/subscriber/test_message.py index 8bbd38910..09f796480 100644 --- a/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/tests/unit/pubsub_v1/subscriber/test_message.py @@ -36,16 +36,19 @@ def create_message(data, ack_id="ACKID", delivery_attempt=0, ordering_key="", **attrs): with mock.patch.object(time, "time") as time_: time_.return_value = RECEIVED_SECONDS - msg = message.Message( - message=gapic_types.PubsubMessage( - attributes=attrs, - data=data, - message_id="message_id", - publish_time=timestamp_pb2.Timestamp( - seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000 - ), - ordering_key=ordering_key, + gapic_pubsub_message = gapic_types.PubsubMessage( + attributes=attrs, + data=data, + message_id="message_id", + publish_time=timestamp_pb2.Timestamp( + seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000 ), + ordering_key=ordering_key, + ) + msg = message.Message( + # The code under test uses a raw protobuf PubsubMessage, i.e. w/o additional + # Python class wrappers, hence the "_pb" + message=gapic_pubsub_message._pb, ack_id=ack_id, delivery_attempt=delivery_attempt, request_queue=queue.Queue(), diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index c0ca8e68b..6fd83d13a 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -21,7 +21,7 @@ def make_message(ack_id, ordering_key): proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key) - return message.Message(proto_msg, ack_id, 0, queue.Queue()) + return message.Message(proto_msg._pb, ack_id, 0, queue.Queue()) def test_init():