Skip to content

Commit

Permalink
fix(pubsub): fix messages delivered multiple times despite a long ACK…
Browse files Browse the repository at this point in the history
… deadline (#9525)

* fix(pubsub): lease-manage all received messages

This is to prevent the messages that are put on hold from unnecessarily
timing out too soon, causing the backend to re-send them.

* Exclude on hold messages from load calculation

Even the messages received that exceed the maximum load (as defined by
flow control) must be lease-mananged to avoid unnecessary ACK deadline
expirations, but since they are not dispatched (yet) to user callbacks,
they should not contribute to the overall load.

Without this change, the total load could be overestimated, resulting
in an indefinitely paused message stream, and messages not being
dispatched to callbacks when they should be.

* Use histogram to set default stream ACK deadline

With all the messages lease-managed (even those on hold), there is no
need to have a fixed default value.

* Add warning if internal bytes count is negative

This should not happen, but if it does, it is a bug in the
StreamingPullManager logic, and we should know about it.
  • Loading branch information
plamut authored Nov 22, 2019
1 parent 0a9665e commit d4c440b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@
_RESUME_THRESHOLD = 0.8
"""The load threshold below which to resume the incoming message stream."""

_DEFAULT_STREAM_ACK_DEADLINE = 60
"""The default message acknowledge deadline in seconds for incoming message stream.
This default deadline is dynamically modified for the messages that are added
to the lease management.
"""


def _maybe_wrap_exception(exception):
"""Wraps a gRPC exception class, if needed."""
Expand Down Expand Up @@ -135,9 +128,15 @@ def __init__(
# because the FlowControl limits have been hit.
self._messages_on_hold = queue.Queue()

# the total number of bytes consumed by the messages currently on hold
self._on_hold_bytes = 0

# A lock ensuring that pausing / resuming the consumer are both atomic
# operations that cannot be executed concurrently. Needed for properly
# syncing these operations with the current leaser load.
# syncing these operations with the current leaser load. Additionally,
# the lock is used to protect modifications of internal data that
# affects the load computation, i.e. the count and size of the messages
# currently on hold.
self._pause_resume_lock = threading.Lock()

# The threads created in ``.open()``.
Expand Down Expand Up @@ -218,10 +217,18 @@ def load(self):
if self._leaser is None:
return 0.0

# Messages that are temporarily put on hold are not being delivered to
# user's callbacks, thus they should not contribute to the flow control
# load calculation.
# However, since these messages must still be lease-managed to avoid
# unnecessary ACK deadline expirations, their count and total size must
# be subtracted from the leaser's values.
return max(
[
self._leaser.message_count / self._flow_control.max_messages,
self._leaser.bytes / self._flow_control.max_bytes,
(self._leaser.message_count - self._messages_on_hold.qsize())
/ self._flow_control.max_messages,
(self._leaser.bytes - self._on_hold_bytes)
/ self._flow_control.max_bytes,
]
)

Expand Down Expand Up @@ -292,13 +299,19 @@ def _maybe_release_messages(self):
except queue.Empty:
break

self.leaser.add(
[requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)]
)
self._on_hold_bytes -= msg.size

if self._on_hold_bytes < 0:
_LOGGER.warning(
"On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
)
self._on_hold_bytes = 0

_LOGGER.debug(
"Released held message to leaser, scheduling callback for it, "
"still on hold %s.",
"Released held message, scheduling callback for it, "
"still on hold %s (bytes %s).",
self._messages_on_hold.qsize(),
self._on_hold_bytes,
)
self._scheduler.schedule(self._callback, msg)

Expand Down Expand Up @@ -392,17 +405,7 @@ def open(self, callback, on_callback_error):
)

# Create the RPC

# We must use a fixed value for the ACK deadline, as we cannot read it
# from the subscription. The latter would require `pubsub.subscriptions.get`
# permission, which is not granted to the default subscriber role
# `roles/pubsub.subscriber`.
# See also https://github.com/googleapis/google-cloud-python/issues/9339
#
# When dynamic lease management is enabled for the "on hold" messages,
# the default stream ACK deadline should again be set based on the
# historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE
stream_ack_deadline_seconds = self.ack_histogram.percentile(99)

get_initial_request = functools.partial(
self._get_initial_request, stream_ack_deadline_seconds
Expand Down Expand Up @@ -540,40 +543,46 @@ def _on_response(self, response):
the callback for each message using the executor.
"""
_LOGGER.debug(
"Processing %s received message(s), currenty on hold %s.",
"Processing %s received message(s), currenty on hold %s (bytes %s).",
len(response.received_messages),
self._messages_on_hold.qsize(),
self._on_hold_bytes,
)

# Immediately (i.e. without waiting for the auto lease management)
# modack the messages we received, as this tells the server that we've
# received them.
items = [
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
for message in response.received_messages
]
self._dispatcher.modify_ack_deadline(items)

invoke_callbacks_for = []

for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message, received_message.ack_id, self._scheduler.queue
)
if self.load < _MAX_LOAD:
req = requests.LeaseRequest(
ack_id=message.ack_id, byte_size=message.size
)
self.leaser.add([req])
invoke_callbacks_for.append(message)
self.maybe_pause_consumer()
else:
self._messages_on_hold.put(message)

# Immediately (i.e. without waiting for the auto lease management)
# modack the messages we received and not put on hold, as this tells
# the server that we've received them.
items = [
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
for message in invoke_callbacks_for
]
self._dispatcher.modify_ack_deadline(items)
# Making a decision based on the load, and modifying the data that
# affects the load -> needs a lock, as that state can be modified
# by different threads.
with self._pause_resume_lock:
if self.load < _MAX_LOAD:
invoke_callbacks_for.append(message)
else:
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size

req = requests.LeaseRequest(ack_id=message.ack_id, byte_size=message.size)
self.leaser.add([req])
self.maybe_pause_consumer()

_LOGGER.debug(
"Scheduling callbacks for %s new messages, new total on hold %s.",
"Scheduling callbacks for %s new messages, new total on hold %s (bytes %s).",
len(invoke_callbacks_for),
self._messages_on_hold.qsize(),
self._on_hold_bytes,
)
for msg in invoke_callbacks_for:
self._scheduler.schedule(self._callback, msg)
Expand Down
16 changes: 6 additions & 10 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,6 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

@pytest.mark.xfail(
reason="The default stream ACK deadline is static and received messages "
"exceeding FlowControl.max_messages are currently not lease managed."
)
def test_streaming_pull_ack_deadline(
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
):
Expand All @@ -400,29 +396,29 @@ def test_streaming_pull_ack_deadline(
# Subscribe to the topic. This must happen before the messages
# are published.
subscriber.create_subscription(
subscription_path, topic_path, ack_deadline_seconds=240
subscription_path, topic_path, ack_deadline_seconds=45
)

# publish some messages and wait for completion
self._publish_messages(publisher, topic_path, batch_sizes=[2])

# subscribe to the topic
callback = StreamingPullCallback(
processing_time=70, # more than the default stream ACK deadline (60s)
processing_time=13, # more than the default stream ACK deadline (10s)
resolve_at_msg_count=3, # one more than the published messages count
)
flow_control = types.FlowControl(max_messages=1)
subscription_future = subscriber.subscribe(
subscription_path, callback, flow_control=flow_control
)

# We expect to process the first two messages in 2 * 70 seconds, and
# We expect to process the first two messages in 2 * 13 seconds, and
# any duplicate message that is re-sent by the backend in additional
# 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
# no duplicates in 240 seconds, we can reasonably assume that there
# 13 seconds, totalling 39 seconds (+ overhead) --> if there have been
# no duplicates in 60 seconds, we can reasonably assume that there
# won't be any.
try:
callback.done_future.result(timeout=240)
callback.done_future.result(timeout=60)
except exceptions.TimeoutError:
# future timed out, because we received no excessive messages
assert sorted(callback.seen_message_ids) == [1, 2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,15 @@ def test__maybe_release_messages_on_overload():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
)
# Ensure load is exactly 1.0 (to verify that >= condition is used)
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
_leaser.message_count = 10
_leaser.bytes = 1000

msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=11)
manager._messages_on_hold.put(msg)
manager._on_hold_bytes = msg.size

# Ensure load is exactly 1.0 (to verify that >= condition is used)
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
_leaser.message_count = 10
_leaser.bytes = 1000 + msg.size

manager._maybe_release_messages()

Expand All @@ -254,18 +256,20 @@ def test__maybe_release_messages_below_overload():
)
manager._callback = mock.sentinel.callback

# init leaser message count to 8 to leave room for 2 more messages
# Init leaser message count to 11, so that when subtracting the 3 messages
# that are on hold, there is still room for another 2 messages before the
# max load is hit.
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
fake_leaser_add(_leaser, init_msg_count=8, assumed_msg_size=25)
_leaser.add = mock.Mock(wraps=_leaser.add) # to spy on calls
fake_leaser_add(_leaser, init_msg_count=11, assumed_msg_size=10)

messages = [
mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=11),
mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=22),
mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=33),
mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=10),
mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=10),
mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=10),
]
for msg in messages:
manager._messages_on_hold.put(msg)
manager._on_hold_bytes = 3 * 10

# the actual call of MUT
manager._maybe_release_messages()
Expand All @@ -274,13 +278,6 @@ def test__maybe_release_messages_below_overload():
msg = manager._messages_on_hold.get_nowait()
assert msg.ack_id == "ack_baz"

assert len(_leaser.add.mock_calls) == 2
expected_calls = [
mock.call([requests.LeaseRequest(ack_id="ack_foo", byte_size=11)]),
mock.call([requests.LeaseRequest(ack_id="ack_bar", byte_size=22)]),
]
_leaser.add.assert_has_calls(expected_calls)

schedule_calls = manager._scheduler.schedule.mock_calls
assert len(schedule_calls) == 2
for _, call_args, _ in schedule_calls:
Expand All @@ -289,6 +286,34 @@ def test__maybe_release_messages_below_overload():
assert call_args[1].ack_id in ("ack_foo", "ack_bar")


def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog):
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
)

msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=17)
manager._messages_on_hold.put(msg)
manager._on_hold_bytes = 5 # too low for some reason

_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
_leaser.message_count = 3
_leaser.bytes = 150

with caplog.at_level(logging.WARNING):
manager._maybe_release_messages()

expected_warnings = [
record.message.lower()
for record in caplog.records
if "unexpectedly negative" in record.message
]
assert len(expected_warnings) == 1
assert "on hold bytes" in expected_warnings[0]
assert "-12" in expected_warnings[0]

assert manager._on_hold_bytes == 0 # should be auto-corrected


def test_send_unary():
manager = make_manager()
manager._UNARY_REQUESTS = True
Expand Down Expand Up @@ -404,8 +429,6 @@ def test_heartbeat_inactive():
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
)
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE

manager = make_manager()

manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)
Expand Down Expand Up @@ -435,7 +458,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
assert initial_request_arg.func == manager._get_initial_request
assert initial_request_arg.args[0] == stream_ack_deadline
assert initial_request_arg.args[0] == 10 # the default stream ACK timeout
assert not manager._client.api.get_subscription.called

resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
Expand Down Expand Up @@ -668,13 +691,17 @@ def test__on_response_with_leaser_overload():
# are called in the expected way.
manager._on_response(response)

# only the messages that are added to the lease management and dispatched to
# callbacks should have their ACK deadline extended
# all messages should be added to the lease management and have their ACK
# deadline extended, even those not dispatched to callbacks
dispatcher.modify_ack_deadline.assert_called_once_with(
[requests.ModAckRequest("fack", 10)]
[
requests.ModAckRequest("fack", 10),
requests.ModAckRequest("back", 10),
requests.ModAckRequest("zack", 10),
]
)

# one message should be scheduled, the leaser capacity allows for it
# one message should be scheduled, the flow control limits allow for it
schedule_calls = scheduler.schedule.mock_calls
assert len(schedule_calls) == 1
call_args = schedule_calls[0][1]
Expand Down

0 comments on commit d4c440b

Please sign in to comment.