Skip to content

Commit

Permalink
Make get_initial_request more resilient to race conditions. (#5803)
Browse files Browse the repository at this point in the history
There's a rare case where the stream can be restarted while the streaming pull
manager is shutting down. This causes get_initial_request to be called while
the manager is in a bad state, which will trigger an AttributeError when
attempting to read the list of outstanding Ack IDs from the leaser.

Closes #5792
  • Loading branch information
theacodes authored Aug 14, 2018
1 parent d3fdf2a commit 3541f26
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,10 @@ def _get_initial_request(self):
"""
# Any ack IDs that are under lease management need to have their
# deadline extended immediately.
lease_ids = self._leaser.ack_ids
if self._leaser is not None:
lease_ids = self._leaser.ack_ids
else:
lease_ids = []

# Put the request together.
request = types.StreamingPullRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,19 @@ def test__get_initial_request():
assert initial_request.modify_deadline_seconds == [10, 10]


def test__get_initial_request_wo_leaser():
manager = make_manager()
manager._leaser = None

initial_request = manager._get_initial_request()

assert isinstance(initial_request, types.StreamingPullRequest)
assert initial_request.subscription == 'subscription-name'
assert initial_request.stream_ack_deadline_seconds == 10
assert initial_request.modify_deadline_ack_ids == []
assert initial_request.modify_deadline_seconds == []


def test_on_response():
manager, _, dispatcher, _, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback
Expand Down

0 comments on commit 3541f26

Please sign in to comment.