Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples(pubsub): Add publisher flow control sample #12005

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions google-cloud-pubsub/samples/acceptance/topics_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,17 @@
end
end

it "supports pubsub_publisher_flow_control" do
#setup
@topic = pubsub.create_topic topic_id
@subscription = @topic.subscribe random_subscription_id

# pubsub_publisher_flow_control
assert_output "Published messages with flow control settings to #{topic_id}.\n" do
publish_messages_async_with_flow_control topic_id: topic_id
end
end

it "supports publish_with_error_handler" do
#setup
@topic = pubsub.create_topic topic_id
Expand Down
31 changes: 31 additions & 0 deletions google-cloud-pubsub/samples/topics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,37 @@ def publish_messages_async_with_concurrency_control topic_id:
# [END pubsub_publisher_concurrency_control]
end

def publish_messages_async_with_flow_control topic_id:
# [START pubsub_publisher_flow_control]
# topic_id = "your-topic-id"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this line a bit further down, just above the topic = ... line where it is used, because that's where we'd expect to see it in actual code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the style throughout these samples, so it will be best not to change it in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grrr... well, I guess I'll open an issue.

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
# Configure how many messages the publisher client can hold in memory
# before publishing succeeds, and what to do when messages exceed the limit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "before publishing succeeds?"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks!

flow_control: {
message_limit: 100,
quartzmo marked this conversation as resolved.
Show resolved Hide resolved
byte_limit: 10 * 1024 * 1024, # 10 MiB
# Block more messages from being published when the limit is reached. The
# other options are :ignore and :error.
limit_exceeded_behavior: :block
}
}
# Publish 1000 messages in quick succession to trigger publisher flow control.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say "to trigger flow control." It's possible that flow control will be triggered, but it's also entirely possible it won't be if messages get out quickly enough. Either way, the fact that flow control was triggered isn't something that will be easily visible to the user. Maybe say "Rapidly publishing 1000 messages in a loop may be constrained by flow control."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

1000.times do |i|
topic.publish_async "message #{i}" do |result|
raise "Failed to publish the message." unless result.succeeded?
end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Published messages with flow control settings to #{topic_id}."
# [END pubsub_publisher_flow_control]
end

def publish_with_error_handler topic_id:
# [START pubsub_publish_with_error_handler]
# topic_id = "your-topic-id"
Expand Down