diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 916161616..c6dbf067f 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -28,6 +28,9 @@ from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests +from google.cloud.pubsub_v1.subscriber.exceptions import ( + AcknowledgeStatus, +) if typing.TYPE_CHECKING: # pragma: NO COVER import queue @@ -128,17 +131,50 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None: nack_requests: List[requests.NackRequest] = [] drop_requests: List[requests.DropRequest] = [] + lease_ids = set() + modack_ids = set() + ack_ids = set() + nack_ids = set() + drop_ids = set() + exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled() + for item in items: if isinstance(item, requests.LeaseRequest): - lease_requests.append(item) + if ( + item.ack_id not in lease_ids + ): # LeaseRequests have no futures to handle. + lease_ids.add(item.ack_id) + lease_requests.append(item) elif isinstance(item, requests.ModAckRequest): - modack_requests.append(item) + if item.ack_id in modack_ids: + self._handle_duplicate_request_future( + exactly_once_delivery_enabled, item + ) + else: + modack_ids.add(item.ack_id) + modack_requests.append(item) elif isinstance(item, requests.AckRequest): - ack_requests.append(item) + if item.ack_id in ack_ids: + self._handle_duplicate_request_future( + exactly_once_delivery_enabled, item + ) + else: + ack_ids.add(item.ack_id) + ack_requests.append(item) elif isinstance(item, requests.NackRequest): - nack_requests.append(item) + if item.ack_id in nack_ids: + self._handle_duplicate_request_future( + exactly_once_delivery_enabled, item + ) + else: + nack_ids.add(item.ack_id) + nack_requests.append(item) elif isinstance(item, requests.DropRequest): - drop_requests.append(item) + if ( + item.ack_id not in drop_ids + ): # DropRequests have no futures to handle. + drop_ids.add(item.ack_id) + drop_requests.append(item) else: warnings.warn( f'Skipping unknown request item of type "{type(item)}"', @@ -164,6 +200,29 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None: if drop_requests: self.drop(drop_requests) + def _handle_duplicate_request_future( + self, + exactly_once_delivery_enabled: bool, + item: Union[requests.AckRequest, requests.ModAckRequest, requests.NackRequest], + ) -> None: + _LOGGER.debug( + "This is a duplicate %s with the same ack_id: %s.", + type(item), + item.ack_id, + ) + if item.future: + if exactly_once_delivery_enabled: + item.future.set_exception( + ValueError(f"Duplicate ack_id for {type(item)}") + ) + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + else: + # When exactly-once delivery is NOT enabled, acks/modacks are considered + # best-effort, so the future should succeed even though this is a duplicate. + item.future.set_result(AcknowledgeStatus.SUCCESS) + def ack(self, items: Sequence[requests.AckRequest]) -> None: """Acknowledge the given messages. diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index c1de19e65..c6902da69 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -24,6 +24,9 @@ import mock import pytest +from google.cloud.pubsub_v1.subscriber.exceptions import ( + AcknowledgeStatus, +) @pytest.mark.parametrize( @@ -48,6 +51,7 @@ def test_dispatch_callback_active_manager(item, method_name): dispatcher_.dispatch_callback(items) method.assert_called_once_with([item]) + manager._exactly_once_delivery_enabled.assert_called() @pytest.mark.parametrize( @@ -73,6 +77,274 @@ def test_dispatch_callback_inactive_manager(item, method_name): dispatcher_.dispatch_callback(items) method.assert_called_once_with([item]) + manager._exactly_once_delivery_enabled.assert_called() + + +@pytest.mark.parametrize( + "items,method_name", + [ + ( + [ + requests.AckRequest("0", 0, 0, "", None), + requests.AckRequest("0", 0, 1, "", None), + ], + "ack", + ), + ( + [ + requests.DropRequest("0", 0, ""), + requests.DropRequest("0", 1, ""), + ], + "drop", + ), + ( + [ + requests.LeaseRequest("0", 0, ""), + requests.LeaseRequest("0", 1, ""), + ], + "lease", + ), + ( + [ + requests.ModAckRequest("0", 0, None), + requests.ModAckRequest("0", 1, None), + ], + "modify_ack_deadline", + ), + ( + [ + requests.NackRequest("0", 0, "", None), + requests.NackRequest("0", 1, "", None), + ], + "nack", + ), + ], +) +def test_dispatch_duplicate_items_callback_active_manager_no_futures( + items, method_name +): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + manager._exactly_once_delivery_enabled.return_value = False + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_called_once_with([items[0]]) + manager._exactly_once_delivery_enabled.assert_called() + + +@pytest.mark.parametrize( + "items,method_name", + [ + ( + [ + requests.AckRequest("0", 0, 0, "", None), + requests.AckRequest("0", 0, 1, "", futures.Future()), + ], + "ack", + ), + ( + [ + requests.DropRequest("0", 0, ""), + requests.DropRequest("0", 1, ""), + ], + "drop", + ), + ( + [ + requests.LeaseRequest("0", 0, ""), + requests.LeaseRequest("0", 1, ""), + ], + "lease", + ), + ( + [ + requests.ModAckRequest("0", 0, None), + requests.ModAckRequest("0", 1, futures.Future()), + ], + "modify_ack_deadline", + ), + ( + [ + requests.NackRequest("0", 0, "", None), + requests.NackRequest("0", 1, "", futures.Future()), + ], + "nack", + ), + ], +) +def test_dispatch_duplicate_items_callback_active_manager_with_futures_no_eod( + items, method_name +): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + manager._exactly_once_delivery_enabled.return_value = False + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_called_once_with([items[0]]) + manager._exactly_once_delivery_enabled.assert_called() + + if method_name != "drop" and method_name != "lease": + assert items[1].future.result() == AcknowledgeStatus.SUCCESS + + +@pytest.mark.parametrize( + "items,method_name", + [ + ( + [ + requests.AckRequest("0", 0, 0, "", None), + requests.AckRequest("0", 0, 1, "", futures.Future()), + ], + "ack", + ), + ( + [ + requests.DropRequest("0", 0, ""), + requests.DropRequest("0", 1, ""), + ], + "drop", + ), + ( + [ + requests.LeaseRequest("0", 0, ""), + requests.LeaseRequest("0", 1, ""), + ], + "lease", + ), + ( + [ + requests.ModAckRequest("0", 0, None), + requests.ModAckRequest("0", 1, futures.Future()), + ], + "modify_ack_deadline", + ), + ( + [ + requests.NackRequest("0", 0, "", None), + requests.NackRequest("0", 1, "", futures.Future()), + ], + "nack", + ), + ], +) +def test_dispatch_duplicate_items_callback_active_manager_with_futures_eod( + items, method_name +): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + manager._exactly_once_delivery_enabled.return_value = True + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_called_once_with([items[0]]) + manager._exactly_once_delivery_enabled.assert_called() + + if method_name != "drop" and method_name != "lease": + with pytest.raises(ValueError) as err: + items[1].future.result() + assert err.errisinstance(ValueError) + + +def test_dispatch_duplicate_items_diff_types_callback_active_manager_with_futures_eod(): + ack_future = futures.Future() + ack_request = requests.AckRequest("0", 0, 1, "", ack_future) + drop_request = requests.DropRequest("0", 1, "") + lease_request = requests.LeaseRequest("0", 1, "") + nack_future = futures.Future() + nack_request = requests.NackRequest("0", 1, "", nack_future) + modack_future = futures.Future() + modack_request = requests.ModAckRequest("0", 1, modack_future) + + items = [ack_request, drop_request, lease_request, nack_request, modack_request] + + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + manager._exactly_once_delivery_enabled.return_value = True + with mock.patch.multiple( + dispatcher_, + ack=mock.DEFAULT, + nack=mock.DEFAULT, + drop=mock.DEFAULT, + lease=mock.DEFAULT, + modify_ack_deadline=mock.DEFAULT, + ): + dispatcher_.dispatch_callback(items) + manager._exactly_once_delivery_enabled.assert_called() + dispatcher_.ack.assert_called_once_with([ack_request]) + dispatcher_.drop.assert_called_once_with([drop_request]) + dispatcher_.lease.assert_called_once_with([lease_request]) + dispatcher_.nack.assert_called_once_with([nack_request]) + dispatcher_.modify_ack_deadline.assert_called_once_with([modack_request]) + + +@pytest.mark.parametrize( + "items,method_name", + [ + ( + [ + requests.AckRequest("0", 0, 0, "", None), + requests.AckRequest("0", 0, 1, "", None), + ], + "ack", + ), + ( + [ + requests.DropRequest("0", 0, ""), + requests.DropRequest("0", 1, ""), + ], + "drop", + ), + ( + [ + requests.LeaseRequest("0", 0, ""), + requests.LeaseRequest("0", 1, ""), + ], + "lease", + ), + ( + [ + requests.ModAckRequest("0", 0, None), + requests.ModAckRequest("0", 1, None), + ], + "modify_ack_deadline", + ), + ( + [ + requests.NackRequest("0", 0, "", None), + requests.NackRequest("0", 1, "", None), + ], + "nack", + ), + ], +) +def test_dispatch_duplicate_items_callback_active_manager_no_futures_eod( + items, method_name +): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + manager._exactly_once_delivery_enabled.return_value = True + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_called_once_with([items[0]]) + manager._exactly_once_delivery_enabled.assert_called() def test_unknown_request_type():