Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exactly-once delivery support #550

Merged
merged 44 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ed5d6e5
Exactly-once changes
pradn Jan 19, 2022
e35de42
rename retry duration constants
pradn Feb 16, 2022
23504c8
add grpc-status dependency
pradn Feb 16, 2022
0a33f37
change dep from grpc-status to grpcio-status per error message during…
pradn Feb 16, 2022
0b6db2e
run lint formatter
pradn Feb 16, 2022
ccbd066
loosen version requirement for grpcio-status dependency to fix build
pradn Feb 16, 2022
bd21fad
rerun formatter with new version of black to fix lint error
pradn Feb 16, 2022
a6ab8ef
Add receive_messages_with_exactly_once_subscribe sample.
pradn Feb 16, 2022
fd73015
Return new AcknowledgeError exception type for ack/modack failures. T…
pradn Feb 18, 2022
349de40
Modify exactly-once subscribe sample to use new AcknowledgeError exce…
pradn Feb 18, 2022
71cb780
Fix formatting for google/cloud/pubsub_v1/subscriber/exceptions.py
pradn Feb 18, 2022
77ce3ef
Address Mahesh's comments
pradn Feb 18, 2022
f6f5d8e
Rename AcknowledgeErrorCode to AcknowledgeStatus bc it includes SUCCESS.
pradn Feb 18, 2022
f35573a
Retry leasing modack failures if exactly-once is enabled. Tests haven…
pradn Feb 18, 2022
c8017f7
Address Mahesh's comments
pradn Feb 18, 2022
b2e9c4b
Shorten use of AcknowledgeError
pradn Feb 18, 2022
21c1431
Fix tests and code
pradn Feb 22, 2022
08351c2
Fix lint errors
pradn Feb 23, 2022
7ba40e1
Improved test coverage.
pradn Feb 23, 2022
94aa54c
Improve coverage.
pradn Feb 23, 2022
97fc72c
improve coverage for streaming_pull_manager
pradn Feb 23, 2022
1220d0f
Improve code coverage
pradn Feb 23, 2022
57235b9
Reformat files
pradn Feb 23, 2022
10f5a93
More fixes
pradn Feb 23, 2022
cdb6340
Improve coverage
pradn Feb 23, 2022
03c8b54
Improve coverage
pradn Feb 23, 2022
8099fde
lint
pradn Feb 23, 2022
7edee1d
Retry on a new thread to avoid blocking the one dispatcher thread.
pradn Feb 24, 2022
6fe7bf8
Remove sample - will be pulled into separate PR.
pradn Feb 24, 2022
1d209c9
Fix type checking errors.
pradn Feb 24, 2022
3facfa5
Address some of Mahesh's comments
pradn Feb 25, 2022
c186713
Return AcknowledgeStatus.SUCCESS for _with_response methods if exactl…
pradn Feb 25, 2022
8fb9d56
Get coverage to 100%
pradn Feb 25, 2022
898715b
Add default value to new Message ctor parameter so PubSubLite code do…
pradn Feb 25, 2022
245cdfd
Complete futures when a permanent RetryError is thrown and the Stream…
pradn Feb 25, 2022
b1e5d70
Fix lint
pradn Feb 25, 2022
377222b
Fix coverage false positive in test by ignoring it.
pradn Feb 25, 2022
a20d48c
Reword some comments with "exactly-once delivery"
pradn Feb 25, 2022
3b390b7
Remove debug print
pradn Feb 25, 2022
9fa28c5
Fix coverage false positive in test by ignoring it.
pradn Feb 25, 2022
c4ba9c7
Complete all requests, not just ones with futures
pradn Feb 28, 2022
36fcab9
Better names to reflect them applying to all reqs not just ones with …
pradn Feb 28, 2022
4158315
Improve tests and comments
pradn Feb 28, 2022
eddd267
Merge branch 'main' into exactly-once2
pradn Mar 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google/cloud/pubsub_v1/proto/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ message StreamingPullRequest {
message StreamingPullResponse {
// Subscription properties sent as part of the response.
message SubscriptionProperties {
bool exactly_once_delivery_enabled = 1;
plamut marked this conversation as resolved.
Show resolved Hide resolved
// True iff message ordering is enabled for this subscription.
bool message_ordering_enabled = 2;
}
Expand Down
145 changes: 130 additions & 15 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
from __future__ import absolute_import
from __future__ import division

import functools
import itertools
import logging
import math
import time
import threading
import typing
from typing import List, Optional, Sequence, Union
import warnings
from google.api_core.retry import exponential_sleep_generator

from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
import queue
Expand Down Expand Up @@ -66,6 +68,14 @@
IDs at a time.
"""

_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1
"""The time to wait for the first retry of failed acks and modacks when exactly-once
pradn marked this conversation as resolved.
Show resolved Hide resolved
pradn marked this conversation as resolved.
Show resolved Hide resolved
delivery is enabled."""

_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 10 * 60
"""The maximum amount of time in seconds to retry failed acks and modacks when
exactly-once delivery is enabled."""


class Dispatcher(object):
def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"):
Expand Down Expand Up @@ -168,17 +178,66 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:

# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = gapic_types.StreamingPullRequest(
ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE)
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
ack_reqs_dict=ack_reqs_dict,
)

# Remove the completed messages from lease management.
self.drop(requests_completed)

# Retry on a separate thread so the dispatcher thread isn't blocked
# by sleeps.
if requests_to_retry:
self._start_retry_thread(
"Thread-RetryAcks",
functools.partial(self._retry_acks, requests_to_retry),
)

def _start_retry_thread(self, thread_name, thread_target):
# note: if the thread is *not* a daemon, a memory leak exists due to a cpython issue.
# https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
# https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
retry_thread = threading.Thread(
name=thread_name, target=thread_target, daemon=True,
)
# The thread finishes when the requests succeed or eventually fail with
# a back-end timeout error or other permanent failure.
retry_thread.start()

def _retry_acks(self, requests_to_retry):
retry_delay_gen = exponential_sleep_generator(
initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
)
while requests_to_retry:
time_to_wait = next(retry_delay_gen)
_LOGGER.debug(
"Retrying {len(requests_to_retry)} ack(s) after delay of "
+ str(time_to_wait)
+ " seconds"
)
self._manager.send(request)
time.sleep(time_to_wait)

# Remove the message from lease management.
self.drop(items)
ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=[req.ack_id for req in requests_to_retry],
ack_reqs_dict=ack_reqs_dict,
)
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
), "Too many requests to be retried."
# Remove the completed messages from lease management.
self.drop(requests_completed)

def drop(
self,
Expand Down Expand Up @@ -215,16 +274,58 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
"""
# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
seconds = (item.seconds for item in items)
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
deadline_seconds_gen = (item.seconds for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = gapic_types.StreamingPullRequest(
modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE),
modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE),
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
# no further work needs to be done for `requests_to_retry`
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=list(
itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
),
modify_deadline_seconds=list(
itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
),
ack_reqs_dict=ack_reqs_dict,
)
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
), "Too many requests to be retried."

# Retry on a separate thread so the dispatcher thread isn't blocked
# by sleeps.
if requests_to_retry:
self._start_retry_thread(
"Thread-RetryModAcks",
functools.partial(self._retry_modacks, requests_to_retry),
)

def _retry_modacks(self, requests_to_retry):
retry_delay_gen = exponential_sleep_generator(
initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
)
while requests_to_retry:
time_to_wait = next(retry_delay_gen)
_LOGGER.debug(
"Retrying {len(requests_to_retry)} modack(s) after delay of "
+ str(time_to_wait)
+ " seconds"
)
time.sleep(time_to_wait)

ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry],
modify_deadline_seconds=[req.seconds for req in requests_to_retry],
ack_reqs_dict=ack_reqs_dict,
)
self._manager.send(request)

def nack(self, items: Sequence[requests.NackRequest]) -> None:
"""Explicitly deny receipt of messages.
Expand All @@ -233,6 +334,20 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None:
items: The items to deny.
"""
self.modify_ack_deadline(
[requests.ModAckRequest(ack_id=item.ack_id, seconds=0) for item in items]
[
requests.ModAckRequest(
ack_id=item.ack_id, seconds=0, future=item.future
)
for item in items
]
)
self.drop(
maheshgattani marked this conversation as resolved.
Show resolved Hide resolved
[
requests.DropRequest(
ack_id=item.ack_id,
byte_size=item.byte_size,
ordering_key=item.ordering_key,
)
for item in items
]
)
self.drop([requests.DropRequest(*item) for item in items])
7 changes: 3 additions & 4 deletions google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def maintain_leases(self) -> None:
for item in to_drop:
leased_messages.pop(item.ack_id)

# Create a streaming pull request.
# Create a modack request.
# We do not actually call `modify_ack_deadline` over and over
# because it is more efficient to make a single request.
ack_ids = leased_messages.keys()
Expand All @@ -194,9 +194,8 @@ def maintain_leases(self) -> None:
# way for ``send_request`` to fail when the consumer
# is inactive.
assert self._manager.dispatcher is not None
self._manager.dispatcher.modify_ack_deadline(
[requests.ModAckRequest(ack_id, deadline) for ack_id in ack_ids]
)
ack_id_gen = (ack_id for ack_id in ack_ids)
self._manager._send_lease_modacks(ack_id_gen, deadline)

# Now wait an appropriate period of time and do this again.
#
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import typing
from typing import NamedTuple, Optional

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1.subscriber import futures


# Namedtuples for management requests. Used by the Message class to communicate
# items of work back to the policy.
Expand All @@ -22,6 +26,7 @@ class AckRequest(NamedTuple):
byte_size: int
time_to_ack: float
ordering_key: Optional[str]
future: Optional["futures.Future"]


class DropRequest(NamedTuple):
Expand All @@ -39,9 +44,11 @@ class LeaseRequest(NamedTuple):
class ModAckRequest(NamedTuple):
ack_id: str
seconds: float
future: Optional["futures.Future"]
maheshgattani marked this conversation as resolved.
Show resolved Hide resolved


class NackRequest(NamedTuple):
ack_id: str
byte_size: int
ordering_key: Optional[str]
future: Optional["futures.Future"]
Loading