From 3541f2694340584ecdc42f524532bfe732da76b0 Mon Sep 17 00:00:00 2001 From: Thea Flowers Date: Tue, 14 Aug 2018 15:20:07 -0500 Subject: [PATCH] Make get_initial_request more resilient to race conditions. (#5803) There's a rare case where the stream can be restarted while the streaming pull manager is shutting down. This causes get_initial_request to be called while the manager is in a bad state, which will trigger an AttributeError when attempting to read the list of outstanding Ack IDs from the leaser. Closes #5792 --- .../subscriber/_protocol/streaming_pull_manager.py | 5 ++++- .../subscriber/test_streaming_pull_manager.py | 13 +++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 6c1d90192477..21a8f98851a0 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -378,7 +378,10 @@ def _get_initial_request(self): """ # Any ack IDs that are under lease management need to have their # deadline extended immediately. - lease_ids = self._leaser.ack_ids + if self._leaser is not None: + lease_ids = self._leaser.ack_ids + else: + lease_ids = [] # Put the request together. request = types.StreamingPullRequest( diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 47638070478a..15f3bc95db80 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -421,6 +421,19 @@ def test__get_initial_request(): assert initial_request.modify_deadline_seconds == [10, 10] +def test__get_initial_request_wo_leaser(): + manager = make_manager() + manager._leaser = None + + initial_request = manager._get_initial_request() + + assert isinstance(initial_request, types.StreamingPullRequest) + assert initial_request.subscription == 'subscription-name' + assert initial_request.stream_ack_deadline_seconds == 10 + assert initial_request.modify_deadline_ack_ids == [] + assert initial_request.modify_deadline_seconds == [] + + def test_on_response(): manager, _, dispatcher, _, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback