Skip to content

Commit

Permalink
sample: add publish flow control sample and other nits (#429)
Browse files Browse the repository at this point in the history
* sample: add publish flow control sample and other nits

* restore changes in untouched files

* address peter's comments
  • Loading branch information
anguillanneuf authored Jun 17, 2021
1 parent 7597604 commit 0df7c96
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 32 deletions.
126 changes: 94 additions & 32 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def create_topic(project_id, topic_id):

topic = publisher.create_topic(request={"name": topic_path})

print("Created topic: {}".format(topic.name))
print(f"Created topic: {topic.name}")
# [END pubsub_quickstart_create_topic]
# [END pubsub_create_topic]

Expand All @@ -74,7 +74,7 @@ def delete_topic(project_id, topic_id):

publisher.delete_topic(request={"topic": topic_path})

print("Topic deleted: {}".format(topic_path))
print(f"Topic deleted: {topic_path}")
# [END pubsub_delete_topic]


Expand All @@ -94,7 +94,7 @@ def publish_messages(project_id, topic_id):
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
data = "Message number {}".format(n)
data = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
# When you publish a message, the client returns a future.
Expand All @@ -120,7 +120,7 @@ def publish_messages_with_custom_attributes(project_id, topic_id):
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
data = "Message number {}".format(n)
data = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
# Add two attributes, origin and username, to the message
Expand All @@ -136,8 +136,7 @@ def publish_messages_with_custom_attributes(project_id, topic_id):
def publish_messages_with_error_handler(project_id, topic_id):
# [START pubsub_publish_with_error_handler]
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import time

from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
Expand All @@ -146,31 +145,28 @@ def publish_messages_with_error_handler(project_id, topic_id):

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

futures = dict()

def get_callback(f, data):
def callback(f):
def get_callback(publish_future, data):
def callback(publish_future):
try:
print(f.result())
futures.pop(data)
except: # noqa
print("Please handle {} for {}.".format(f.exception(), data))
# Wait 100 ms for the publish call to succeed.
print(publish_future.result(timeout=0.1))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")

return callback

for i in range(10):
data = str(i)
futures.update({data: None})
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data.encode("utf-8"))
futures[data] = future
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))
publish_future = publisher.publish(topic_path, data.encode("utf-8"))
# Non-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(5)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")
# [END pubsub_publish_with_error_handler]
Expand All @@ -179,39 +175,93 @@ def callback(f):
def publish_messages_with_batch_settings(project_id, topic_id):
"""Publishes multiple messages to a Pub/Sub topic with batch settings."""
# [START pubsub_publisher_batch_settings]
from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure the batch to publish as soon as there is ten messages,
# one kilobyte of data, or one second has passed.
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=10, # default 100
max_bytes=1024, # default 1 MB
max_bytes=1024, # default 1 MiB
max_latency=1, # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(future):
message_id = future.result()
print(message_id)

for n in range(1, 10):
data = "Message number {}".format(n)
data = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
future = publisher.publish(topic_path, data)
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
future.add_done_callback(callback)
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with batch settings to {topic_path}.")
# [END pubsub_publisher_batch_settings]


def publish_messages_with_flow_control_settings(project_id, topic_id):
"""Publishes messages to a Pub/Sub topic with flow control settings."""
# [START pubsub_publisher_flow_control]
from concurrent import futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import (
LimitExceededBehavior,
PublisherOptions,
PublishFlowControl,
)

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control_settings = PublishFlowControl(
message_limit=100, # 100 messages
byte_limit=10 * 1024 * 1024, # 10 MiB
limit_exceeded_behavior=LimitExceededBehavior.BLOCK,
)
publisher = pubsub_v1.PublisherClient(
publisher_options=PublisherOptions(flow_control=flow_control_settings)
)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(publish_future):
message_id = publish_future.result()
print(message_id)

# Publish 1000 messages in quick succession to trigger flow control.
for n in range(1, 1000):
data = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with flow control settings to {topic_path}.")
# [END pubsub_publisher_flow_control]


def publish_messages_with_retry_settings(project_id, topic_id):
"""Publishes messages with custom retry settings."""
# [START pubsub_publisher_retry_settings]
Expand Down Expand Up @@ -244,7 +294,7 @@ def publish_messages_with_retry_settings(project_id, topic_id):
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
data = "Message number {}".format(n)
data = f"Message number {n}"
# Data must be a bytestring
data = data.encode("utf-8")
future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
Expand Down Expand Up @@ -365,7 +415,8 @@ def detach_subscription(project_id, subscription_id):

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("project_id", help="Your Google Cloud project ID")

Expand All @@ -388,7 +439,8 @@ def detach_subscription(project_id, subscription_id):
publish_with_custom_attributes_parser.add_argument("topic_id")

publish_with_error_handler_parser = subparsers.add_parser(
"publish-with-error-handler", help=publish_messages_with_error_handler.__doc__,
"publish-with-error-handler",
help=publish_messages_with_error_handler.__doc__,
)
publish_with_error_handler_parser.add_argument("topic_id")

Expand All @@ -398,14 +450,21 @@ def detach_subscription(project_id, subscription_id):
)
publish_with_batch_settings_parser.add_argument("topic_id")

publish_with_flow_control_settings_parser = subparsers.add_parser(
"publish-with-flow-control",
help=publish_messages_with_flow_control_settings.__doc__,
)
publish_with_flow_control_settings_parser.add_argument("topic_id")

publish_with_retry_settings_parser = subparsers.add_parser(
"publish-with-retry-settings",
help=publish_messages_with_retry_settings.__doc__,
)
publish_with_retry_settings_parser.add_argument("topic_id")

publish_with_ordering_keys_parser = subparsers.add_parser(
"publish-with-ordering-keys", help=publish_with_ordering_keys.__doc__,
"publish-with-ordering-keys",
help=publish_with_ordering_keys.__doc__,
)
publish_with_ordering_keys_parser.add_argument("topic_id")

Expand All @@ -416,7 +475,8 @@ def detach_subscription(project_id, subscription_id):
resume_publish_with_ordering_keys_parser.add_argument("topic_id")

detach_subscription_parser = subparsers.add_parser(
"detach-subscription", help=detach_subscription.__doc__,
"detach-subscription",
help=detach_subscription.__doc__,
)
detach_subscription_parser.add_argument("subscription_id")

Expand All @@ -436,6 +496,8 @@ def detach_subscription(project_id, subscription_id):
publish_messages_with_error_handler(args.project_id, args.topic_id)
elif args.command == "publish-with-batch-settings":
publish_messages_with_batch_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-flow-control":
publish_messages_with_flow_control_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-retry-settings":
publish_messages_with_retry_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-ordering-keys":
Expand Down
7 changes: 7 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ def test_publish_with_batch_settings(topic_path, capsys):
assert f"Published messages with batch settings to {topic_path}." in out


def test_publish_with_flow_control_settings(topic_path, capsys):
publisher.publish_messages_with_flow_control_settings(PROJECT_ID, TOPIC_ID)

out, _ = capsys.readouterr()
assert f"Published messages with flow control settings to {topic_path}." in out


def test_publish_with_retry_settings(topic_path, capsys):
publisher.publish_messages_with_retry_settings(PROJECT_ID, TOPIC_ID)

Expand Down

0 comments on commit 0df7c96

Please sign in to comment.