diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 7bc124ca8..1b81a1cb1 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -580,6 +580,55 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_subscriber_blocking_shutdown] +def receive_messages_with_exactly_once_subscribe( + project_id: str, subscription_id: str, timeout: Optional[float] = None +) -> None: + """Receives messages from a pull subscription with exactly-once delivery enabled.""" + # [START pubsub_subscriber_pull_with_exactly_once_delivery_enabled] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.subscriber import sub_exceptions + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + print(f"Received {message}.") + + ack_future = message.ack_with_response() + + try: + # Block on result of acknowledge call. + ack_future.result() + print(f"Ack for message {message.message_id} successful.") + except sub_exceptions.AcknowledgeError as e: + print( + f"Ack for message {message.message_id} failed with error: {e.error_code}" + ) + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print(f"Listening for messages on {subscription_path}..\n") + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + # [END pubsub_subscriber_pull_with_exactly_once_delivery_enabled] + + def synchronous_pull(project_id: str, subscription_id: str) -> None: """Pulling messages synchronously.""" # [START pubsub_subscriber_sync_pull]