From 4dd87781ad42a908aca617a97e0edd4fa43cf647 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 6 Aug 2018 14:33:12 -0400 Subject: [PATCH] Ensure SPM methods check that 'self._consumer' is not None before use. Closes #5751. --- .../_protocol/streaming_pull_manager.py | 11 ++++++----- .../subscriber/test_streaming_pull_manager.py | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index b7b9002e2844..6c1d90192477 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -208,10 +208,11 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" - if self.load >= 1.0 and not self._consumer.is_paused: - _LOGGER.debug( - 'Message backlog over load at %.2f, pausing.', self.load) - self._consumer.pause() + if self.load >= 1.0: + if self._consumer is not None and not self._consumer.is_paused: + _LOGGER.debug( + 'Message backlog over load at %.2f, pausing.', self.load) + self._consumer.pause() def maybe_resume_consumer(self): """Check the current load and resume the consumer if needed.""" @@ -221,7 +222,7 @@ def maybe_resume_consumer(self): # In order to not thrash too much, require us to have passed below # the resume threshold (80% by default) of each flow control setting # before restarting. - if not self._consumer.is_paused: + if self._consumer is None or not self._consumer.is_paused: return if self.load < self.flow_control.resume_threshold: diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 5f2a8f53fb9f..47638070478a 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -111,6 +111,17 @@ def test_ack_deadline(): assert manager.ack_deadline == 20 +def test_maybe_pause_consumer_wo_consumer_set(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000)) + manager.maybe_pause_consumer() # no raise + # Ensure load > 1 + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + _leaser.message_count = 100 + _leaser.bytes = 10000 + manager.maybe_pause_consumer() # no raise + + def test_lease_load_and_pause(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000)) @@ -177,6 +188,12 @@ def test_resume_not_paused(): manager._consumer.resume.assert_not_called() +def test_maybe_resume_consumer_wo_consumer_set(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000)) + manager.maybe_resume_consumer() # no raise + + def test_send_unary(): manager = make_manager() manager._UNARY_REQUESTS = True