Skip to content

Commit

Permalink
Add receive_messages_with_exactly_once_subscribe sample with its own …
Browse files Browse the repository at this point in the history
…region tag
  • Loading branch information
pradn committed Mar 2, 2022
1 parent 78b2f0e commit 2ab6383
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,55 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# [END pubsub_subscriber_blocking_shutdown]


def receive_messages_with_exactly_once_subscribe(
project_id: str, subscription_id: str, timeout: Optional[float] = None
) -> None:
"""Receives messages from a pull subscription with exactly-once delivery enabled."""
# [START pubsub_subscriber_pull_with_exactly_once_delivery_enabled]
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")

ack_future = message.ack_with_response()

try:
# Block on result of acknowledge call.
ack_future.result()
print(f"Ack for message {message.message_id} successful.")
except sub_exceptions.AcknowledgeError as e:
print(
f"Ack for message {message.message_id} failed with error: {e.error_code}"
)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscriber_pull_with_exactly_once_delivery_enabled]


def synchronous_pull(project_id: str, subscription_id: str) -> None:
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
Expand Down

0 comments on commit 2ab6383

Please sign in to comment.