diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 5a9d08026..e098491fe 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -75,6 +75,14 @@ a subscription. We do this to reduce premature ack expiration. """ +_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { + code_pb2.DEADLINE_EXCEEDED, + code_pb2.RESOURCE_EXHAUSTED, + code_pb2.ABORTED, + code_pb2.INTERNAL, + code_pb2.UNAVAILABLE, +} + def _wrap_as_exception(maybe_exception: Any) -> BaseException: """Wrap an object as a Python exception, if needed. @@ -163,6 +171,8 @@ def _process_requests( requests_completed = [] requests_to_retry = [] for ack_id in ack_reqs_dict: + # Handle special errors returned for ack/modack RPCs via the ErrorInfo + # sidecar metadata when exactly-once delivery is enabled. if errors_dict and ack_id in errors_dict: exactly_once_error = errors_dict[ack_id] if exactly_once_error.startswith("TRANSIENT_"): @@ -176,9 +186,14 @@ def _process_requests( future = ack_reqs_dict[ack_id].future future.set_exception(exc) requests_completed.append(ack_reqs_dict[ack_id]) + # Temporary GRPC errors are retried + elif ( + error_status + and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS + ): + requests_to_retry.append(ack_reqs_dict[ack_id]) + # Other GRPC errors are NOT retried elif error_status: - # Only permanent errors are expected here b/c retriable errors are - # retried at the lower, GRPC level. if error_status.code == code_pb2.PERMISSION_DENIED: exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None) elif error_status.code == code_pb2.FAILED_PRECONDITION: @@ -188,11 +203,13 @@ def _process_requests( future = ack_reqs_dict[ack_id].future future.set_exception(exc) requests_completed.append(ack_reqs_dict[ack_id]) + # Since no error occurred, requests with futures are completed successfully. elif ack_reqs_dict[ack_id].future: future = ack_reqs_dict[ack_id].future # success future.set_result(AcknowledgeStatus.SUCCESS) requests_completed.append(ack_reqs_dict[ack_id]) + # All other requests are considered completed. else: requests_completed.append(ack_reqs_dict[ack_id]) @@ -580,7 +597,9 @@ def send_unary_ack( ack_errors_dict = _get_ack_errors(exc) except exceptions.RetryError as exc: status = status_pb2.Status() - status.code = code_pb2.DEADLINE_EXCEEDED + # Choose a non-retriable error code so the futures fail with + # exceptions. + status.code = code_pb2.UNKNOWN # Makes sure to complete futures so they don't block forever. _process_requests(status, ack_reqs_dict, None) _LOGGER.debug( @@ -634,7 +653,9 @@ def send_unary_modack( modack_errors_dict = _get_ack_errors(exc) except exceptions.RetryError as exc: status = status_pb2.Status() - status.code = code_pb2.DEADLINE_EXCEEDED + # Choose a non-retriable error code so the futures fail with + # exceptions. + status.code = code_pb2.UNKNOWN # Makes sure to complete futures so they don't block forever. _process_requests(status, ack_reqs_dict, None) _LOGGER.debug( diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 9e8d6c5ed..36f82b621 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1735,7 +1735,7 @@ def test_process_requests_permanent_error_raises_exception(): assert not requests_to_retry -def test_process_requests_transient_error_returns_request(): +def test_process_requests_transient_error_returns_request_for_retrying(): # a transient error returns the request in `requests_to_retry` future = futures.Future() ack_reqs_dict = { @@ -1772,6 +1772,38 @@ def test_process_requests_unknown_error_raises_exception(): assert not requests_to_retry +def test_process_requests_retriable_error_status_returns_request_for_retrying(): + # a retriable error status returns the request in `requests_to_retry` + retriable_errors = [ + code_pb2.DEADLINE_EXCEEDED, + code_pb2.RESOURCE_EXHAUSTED, + code_pb2.ABORTED, + code_pb2.INTERNAL, + code_pb2.UNAVAILABLE, + ] + + for retriable_error in retriable_errors: + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future, + ) + } + st = status_pb2.Status() + st.code = retriable_error + ( + requests_completed, + requests_to_retry, + ) = streaming_pull_manager._process_requests(st, ack_reqs_dict, None) + assert not requests_completed + assert requests_to_retry[0].ack_id == "ackid1" + assert not future.done() + + def test_process_requests_permission_denied_error_status_raises_exception(): # a permission-denied error status raises an exception future = futures.Future()