diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index d3b1d6f51eb6..f3798c05610e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -51,13 +51,6 @@ _RESUME_THRESHOLD = 0.8 """The load threshold below which to resume the incoming message stream.""" -_DEFAULT_STREAM_ACK_DEADLINE = 60 -"""The default message acknowledge deadline in seconds for incoming message stream. - -This default deadline is dynamically modified for the messages that are added -to the lease management. -""" - def _maybe_wrap_exception(exception): """Wraps a gRPC exception class, if needed.""" @@ -135,9 +128,15 @@ def __init__( # because the FlowControl limits have been hit. self._messages_on_hold = queue.Queue() + # the total number of bytes consumed by the messages currently on hold + self._on_hold_bytes = 0 + # A lock ensuring that pausing / resuming the consumer are both atomic # operations that cannot be executed concurrently. Needed for properly - # syncing these operations with the current leaser load. + # syncing these operations with the current leaser load. Additionally, + # the lock is used to protect modifications of internal data that + # affects the load computation, i.e. the count and size of the messages + # currently on hold. self._pause_resume_lock = threading.Lock() # The threads created in ``.open()``. @@ -218,10 +217,18 @@ def load(self): if self._leaser is None: return 0.0 + # Messages that are temporarily put on hold are not being delivered to + # user's callbacks, thus they should not contribute to the flow control + # load calculation. + # However, since these messages must still be lease-managed to avoid + # unnecessary ACK deadline expirations, their count and total size must + # be subtracted from the leaser's values. return max( [ - self._leaser.message_count / self._flow_control.max_messages, - self._leaser.bytes / self._flow_control.max_bytes, + (self._leaser.message_count - self._messages_on_hold.qsize()) + / self._flow_control.max_messages, + (self._leaser.bytes - self._on_hold_bytes) + / self._flow_control.max_bytes, ] ) @@ -292,13 +299,19 @@ def _maybe_release_messages(self): except queue.Empty: break - self.leaser.add( - [requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)] - ) + self._on_hold_bytes -= msg.size + + if self._on_hold_bytes < 0: + _LOGGER.warning( + "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes + ) + self._on_hold_bytes = 0 + _LOGGER.debug( - "Released held message to leaser, scheduling callback for it, " - "still on hold %s.", + "Released held message, scheduling callback for it, " + "still on hold %s (bytes %s).", self._messages_on_hold.qsize(), + self._on_hold_bytes, ) self._scheduler.schedule(self._callback, msg) @@ -392,17 +405,7 @@ def open(self, callback, on_callback_error): ) # Create the RPC - - # We must use a fixed value for the ACK deadline, as we cannot read it - # from the subscription. The latter would require `pubsub.subscriptions.get` - # permission, which is not granted to the default subscriber role - # `roles/pubsub.subscriber`. - # See also https://github.com/googleapis/google-cloud-python/issues/9339 - # - # When dynamic lease management is enabled for the "on hold" messages, - # the default stream ACK deadline should again be set based on the - # historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`. - stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE + stream_ack_deadline_seconds = self.ack_histogram.percentile(99) get_initial_request = functools.partial( self._get_initial_request, stream_ack_deadline_seconds @@ -540,40 +543,46 @@ def _on_response(self, response): the callback for each message using the executor. """ _LOGGER.debug( - "Processing %s received message(s), currenty on hold %s.", + "Processing %s received message(s), currenty on hold %s (bytes %s).", len(response.received_messages), self._messages_on_hold.qsize(), + self._on_hold_bytes, ) + # Immediately (i.e. without waiting for the auto lease management) + # modack the messages we received, as this tells the server that we've + # received them. + items = [ + requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) + for message in response.received_messages + ] + self._dispatcher.modify_ack_deadline(items) + invoke_callbacks_for = [] for received_message in response.received_messages: message = google.cloud.pubsub_v1.subscriber.message.Message( received_message.message, received_message.ack_id, self._scheduler.queue ) - if self.load < _MAX_LOAD: - req = requests.LeaseRequest( - ack_id=message.ack_id, byte_size=message.size - ) - self.leaser.add([req]) - invoke_callbacks_for.append(message) - self.maybe_pause_consumer() - else: - self._messages_on_hold.put(message) - - # Immediately (i.e. without waiting for the auto lease management) - # modack the messages we received and not put on hold, as this tells - # the server that we've received them. - items = [ - requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) - for message in invoke_callbacks_for - ] - self._dispatcher.modify_ack_deadline(items) + # Making a decision based on the load, and modifying the data that + # affects the load -> needs a lock, as that state can be modified + # by different threads. + with self._pause_resume_lock: + if self.load < _MAX_LOAD: + invoke_callbacks_for.append(message) + else: + self._messages_on_hold.put(message) + self._on_hold_bytes += message.size + + req = requests.LeaseRequest(ack_id=message.ack_id, byte_size=message.size) + self.leaser.add([req]) + self.maybe_pause_consumer() _LOGGER.debug( - "Scheduling callbacks for %s new messages, new total on hold %s.", + "Scheduling callbacks for %s new messages, new total on hold %s (bytes %s).", len(invoke_callbacks_for), self._messages_on_hold.qsize(), + self._on_hold_bytes, ) for msg in invoke_callbacks_for: self._scheduler.schedule(self._callback, msg) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index fd7473e1e53b..59e5e3fe83a4 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -382,10 +382,6 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) - @pytest.mark.xfail( - reason="The default stream ACK deadline is static and received messages " - "exceeding FlowControl.max_messages are currently not lease managed." - ) def test_streaming_pull_ack_deadline( self, publisher, subscriber, project, topic_path, subscription_path, cleanup ): @@ -400,7 +396,7 @@ def test_streaming_pull_ack_deadline( # Subscribe to the topic. This must happen before the messages # are published. subscriber.create_subscription( - subscription_path, topic_path, ack_deadline_seconds=240 + subscription_path, topic_path, ack_deadline_seconds=45 ) # publish some messages and wait for completion @@ -408,7 +404,7 @@ def test_streaming_pull_ack_deadline( # subscribe to the topic callback = StreamingPullCallback( - processing_time=70, # more than the default stream ACK deadline (60s) + processing_time=13, # more than the default stream ACK deadline (10s) resolve_at_msg_count=3, # one more than the published messages count ) flow_control = types.FlowControl(max_messages=1) @@ -416,13 +412,13 @@ def test_streaming_pull_ack_deadline( subscription_path, callback, flow_control=flow_control ) - # We expect to process the first two messages in 2 * 70 seconds, and + # We expect to process the first two messages in 2 * 13 seconds, and # any duplicate message that is re-sent by the backend in additional - # 70 seconds, totalling 210 seconds (+ overhead) --> if there have been - # no duplicates in 240 seconds, we can reasonably assume that there + # 13 seconds, totalling 39 seconds (+ overhead) --> if there have been + # no duplicates in 60 seconds, we can reasonably assume that there # won't be any. try: - callback.done_future.result(timeout=240) + callback.done_future.result(timeout=60) except exceptions.TimeoutError: # future timed out, because we received no excessive messages assert sorted(callback.seen_message_ids) == [1, 2] diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 114663e7b8e2..1732ec6cd4b3 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -233,13 +233,15 @@ def test__maybe_release_messages_on_overload(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000) ) - # Ensure load is exactly 1.0 (to verify that >= condition is used) - _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) - _leaser.message_count = 10 - _leaser.bytes = 1000 msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=11) manager._messages_on_hold.put(msg) + manager._on_hold_bytes = msg.size + + # Ensure load is exactly 1.0 (to verify that >= condition is used) + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + _leaser.message_count = 10 + _leaser.bytes = 1000 + msg.size manager._maybe_release_messages() @@ -254,18 +256,20 @@ def test__maybe_release_messages_below_overload(): ) manager._callback = mock.sentinel.callback - # init leaser message count to 8 to leave room for 2 more messages + # Init leaser message count to 11, so that when subtracting the 3 messages + # that are on hold, there is still room for another 2 messages before the + # max load is hit. _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) - fake_leaser_add(_leaser, init_msg_count=8, assumed_msg_size=25) - _leaser.add = mock.Mock(wraps=_leaser.add) # to spy on calls + fake_leaser_add(_leaser, init_msg_count=11, assumed_msg_size=10) messages = [ - mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=11), - mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=22), - mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=33), + mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=10), + mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=10), + mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=10), ] for msg in messages: manager._messages_on_hold.put(msg) + manager._on_hold_bytes = 3 * 10 # the actual call of MUT manager._maybe_release_messages() @@ -274,13 +278,6 @@ def test__maybe_release_messages_below_overload(): msg = manager._messages_on_hold.get_nowait() assert msg.ack_id == "ack_baz" - assert len(_leaser.add.mock_calls) == 2 - expected_calls = [ - mock.call([requests.LeaseRequest(ack_id="ack_foo", byte_size=11)]), - mock.call([requests.LeaseRequest(ack_id="ack_bar", byte_size=22)]), - ] - _leaser.add.assert_has_calls(expected_calls) - schedule_calls = manager._scheduler.schedule.mock_calls assert len(schedule_calls) == 2 for _, call_args, _ in schedule_calls: @@ -289,6 +286,34 @@ def test__maybe_release_messages_below_overload(): assert call_args[1].ack_id in ("ack_foo", "ack_bar") +def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + + msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=17) + manager._messages_on_hold.put(msg) + manager._on_hold_bytes = 5 # too low for some reason + + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + _leaser.message_count = 3 + _leaser.bytes = 150 + + with caplog.at_level(logging.WARNING): + manager._maybe_release_messages() + + expected_warnings = [ + record.message.lower() + for record in caplog.records + if "unexpectedly negative" in record.message + ] + assert len(expected_warnings) == 1 + assert "on hold bytes" in expected_warnings[0] + assert "-12" in expected_warnings[0] + + assert manager._on_hold_bytes == 0 # should be auto-corrected + + def test_send_unary(): manager = make_manager() manager._UNARY_REQUESTS = True @@ -404,8 +429,6 @@ def test_heartbeat_inactive(): "google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True ) def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): - stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE - manager = make_manager() manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) @@ -435,7 +458,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] assert initial_request_arg.func == manager._get_initial_request - assert initial_request_arg.args[0] == stream_ack_deadline + assert initial_request_arg.args[0] == 10 # the default stream ACK timeout assert not manager._client.api.get_subscription.called resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with( @@ -668,13 +691,17 @@ def test__on_response_with_leaser_overload(): # are called in the expected way. manager._on_response(response) - # only the messages that are added to the lease management and dispatched to - # callbacks should have their ACK deadline extended + # all messages should be added to the lease management and have their ACK + # deadline extended, even those not dispatched to callbacks dispatcher.modify_ack_deadline.assert_called_once_with( - [requests.ModAckRequest("fack", 10)] + [ + requests.ModAckRequest("fack", 10), + requests.ModAckRequest("back", 10), + requests.ModAckRequest("zack", 10), + ] ) - # one message should be scheduled, the leaser capacity allows for it + # one message should be scheduled, the flow control limits allow for it schedule_calls = scheduler.schedule.mock_calls assert len(schedule_calls) == 1 call_args = schedule_calls[0][1]