diff --git a/google-cloud-pubsub/samples/acceptance/topics_test.rb b/google-cloud-pubsub/samples/acceptance/topics_test.rb index 1e76fb827e8e..ded0f6672025 100644 --- a/google-cloud-pubsub/samples/acceptance/topics_test.rb +++ b/google-cloud-pubsub/samples/acceptance/topics_test.rb @@ -317,6 +317,17 @@ end end + it "supports pubsub_publisher_flow_control" do + #setup + @topic = pubsub.create_topic topic_id + @subscription = @topic.subscribe random_subscription_id + + # pubsub_publisher_flow_control + assert_output "Published messages with flow control settings to #{topic_id}.\n" do + publish_messages_async_with_flow_control topic_id: topic_id + end + end + it "supports publish_with_error_handler" do #setup @topic = pubsub.create_topic topic_id diff --git a/google-cloud-pubsub/samples/topics.rb b/google-cloud-pubsub/samples/topics.rb index b81b04a10b2e..ca230e5ed89c 100644 --- a/google-cloud-pubsub/samples/topics.rb +++ b/google-cloud-pubsub/samples/topics.rb @@ -290,6 +290,37 @@ def publish_messages_async_with_concurrency_control topic_id: # [END pubsub_publisher_concurrency_control] end +def publish_messages_async_with_flow_control topic_id: + # [START pubsub_publisher_flow_control] + # topic_id = "your-topic-id" + require "google/cloud/pubsub" + + pubsub = Google::Cloud::Pubsub.new + + topic = pubsub.topic topic_id, async: { + # Configure how many messages the publisher client can hold in memory + # and what to do when messages exceed the limit. + flow_control: { + message_limit: 100, + byte_limit: 10 * 1024 * 1024, # 10 MiB + # Block more messages from being published when the limit is reached. The + # other options are :ignore and :error. + limit_exceeded_behavior: :block + } + } + # Rapidly publishing 1000 messages in a loop may be constrained by flow control. + 1000.times do |i| + topic.publish_async "message #{i}" do |result| + raise "Failed to publish the message." unless result.succeeded? + end + end + + # Stop the async_publisher to send all queued messages immediately. + topic.async_publisher.stop.wait! + puts "Published messages with flow control settings to #{topic_id}." + # [END pubsub_publisher_flow_control] +end + def publish_with_error_handler topic_id: # [START pubsub_publish_with_error_handler] # topic_id = "your-topic-id"