Skip to content

Commit

Permalink
samples(pubsub): Add publisher flow control sample
Browse files Browse the repository at this point in the history
closes: #11685
pr: #12005
  • Loading branch information
quartzmo authored Jun 18, 2021
1 parent 11a3798 commit 774d430
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
11 changes: 11 additions & 0 deletions google-cloud-pubsub/samples/acceptance/topics_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,17 @@
end
end

it "supports pubsub_publisher_flow_control" do
#setup
@topic = pubsub.create_topic topic_id
@subscription = @topic.subscribe random_subscription_id

# pubsub_publisher_flow_control
assert_output "Published messages with flow control settings to #{topic_id}.\n" do
publish_messages_async_with_flow_control topic_id: topic_id
end
end

it "supports publish_with_error_handler" do
#setup
@topic = pubsub.create_topic topic_id
Expand Down
31 changes: 31 additions & 0 deletions google-cloud-pubsub/samples/topics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,37 @@ def publish_messages_async_with_concurrency_control topic_id:
# [END pubsub_publisher_concurrency_control]
end

def publish_messages_async_with_flow_control topic_id:
# [START pubsub_publisher_flow_control]
# topic_id = "your-topic-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control: {
message_limit: 100,
byte_limit: 10 * 1024 * 1024, # 10 MiB
# Block more messages from being published when the limit is reached. The
# other options are :ignore and :error.
limit_exceeded_behavior: :block
}
}
# Rapidly publishing 1000 messages in a loop may be constrained by flow control.
1000.times do |i|
topic.publish_async "message #{i}" do |result|
raise "Failed to publish the message." unless result.succeeded?
end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Published messages with flow control settings to #{topic_id}."
# [END pubsub_publisher_flow_control]
end

def publish_with_error_handler topic_id:
# [START pubsub_publish_with_error_handler]
# topic_id = "your-topic-id"
Expand Down

0 comments on commit 774d430

Please sign in to comment.