diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 7bc124ca8..ada70a02d 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -580,6 +580,61 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_subscriber_blocking_shutdown] +def receive_messages_with_exactly_once_delivery_enabled( + project_id: str, subscription_id: str, timeout: Optional[float] = None +) -> None: + """Receives messages from a pull subscription with exactly-once delivery enabled.""" + # [START pubsub_subscriber_exactly_once] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + print(f"Received {message}.") + + # Use `ack_with_response()` instead of `ack()` to get a future that tracks + # the result of the acknowledge call. When exactly-once delivery is enabled + # on the subscription, the message is guaranteed to not be delivered again + # if the ack future succeeds. + ack_future = message.ack_with_response() + + try: + # Block on result of acknowledge call. + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + ack_future.result(timeout=timeout) + print(f"Ack for message {message.message_id} successful.") + except sub_exceptions.AcknowledgeError as e: + print( + f"Ack for message {message.message_id} failed with error: {e.error_code}" + ) + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print(f"Listening for messages on {subscription_path}..\n") + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + # [END pubsub_subscriber_exactly_once] + + def synchronous_pull(project_id: str, subscription_id: str) -> None: """Pulling messages synchronously.""" # [START pubsub_subscriber_sync_pull] @@ -881,6 +936,17 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: "timeout", default=None, type=float, nargs="?" ) + receive_messages_with_exactly_once_delivery_enabled_parser = subparsers.add_parser( + "receive-messages-with-exactly-once-delivery-enabled", + help=receive_messages_with_exactly_once_delivery_enabled.__doc__, + ) + receive_messages_with_exactly_once_delivery_enabled_parser.add_argument( + "subscription_id" + ) + receive_messages_with_exactly_once_delivery_enabled_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + synchronous_pull_parser = subparsers.add_parser( "receive-synchronously", help=synchronous_pull.__doc__ ) @@ -967,6 +1033,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: receive_messages_with_blocking_shutdown( args.project_id, args.subscription_id, args.timeout ) + elif args.command == "receive-messages-with-exactly-once-delivery-enabled": + receive_messages_with_exactly_once_delivery_enabled( + args.project_id, args.subscription_id, args.timeout + ) elif args.command == "receive-synchronously": synchronous_pull(args.project_id, args.subscription_id) elif args.command == "receive-synchronously-with-lease": diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 9fcb1c119..614633664 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -624,6 +624,34 @@ def eventually_consistent_test() -> None: eventually_consistent_test() +def test_receive_messages_with_exactly_once_delivery_enabled( + publisher_client: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + + typed_backoff = cast( + Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + ) + + @typed_backoff + def eventually_consistent_test() -> None: + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_exactly_once_delivery_enabled( + PROJECT_ID, SUBSCRIPTION_ASYNC, 10 + ) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "Received" in out + assert "Ack" in out + + eventually_consistent_test() + + def test_listen_for_errors( publisher_client: pubsub_v1.PublisherClient, topic: str,