From b1a7799e22d52686e538eccbb2774a5c14fc92a5 Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Thu, 2 Nov 2017 16:00:10 +0100 Subject: [PATCH] Support setting the max bytes to fetch per request --- lib/kafka/client.rb | 1 + lib/kafka/fetch_operation.rb | 6 ++++-- lib/kafka/protocol/fetch_request.rb | 6 ++++-- spec/broker_spec.rb | 1 + 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index d558bb548..614080b2b 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -369,6 +369,7 @@ def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_by cluster: @cluster, logger: @logger, min_bytes: min_bytes, + max_bytes: max_bytes, max_wait_time: max_wait_time, ) diff --git a/lib/kafka/fetch_operation.rb b/lib/kafka/fetch_operation.rb index c181ab191..ed685b9b8 100644 --- a/lib/kafka/fetch_operation.rb +++ b/lib/kafka/fetch_operation.rb @@ -18,15 +18,16 @@ module Kafka # operation.execute # class FetchOperation - def initialize(cluster:, logger:, min_bytes: 1, max_wait_time: 5) + def initialize(cluster:, logger:, min_bytes: 1, max_bytes: 10485760, max_wait_time: 5) @cluster = cluster @logger = logger @min_bytes = min_bytes + @max_bytes = max_bytes @max_wait_time = max_wait_time @topics = {} end - def fetch_from_partition(topic, partition, offset: :latest, max_bytes: 1048576) + def fetch_from_partition(topic, partition, offset: :latest, max_bytes: 10485760) if offset == :earliest offset = -2 elsif offset == :latest @@ -66,6 +67,7 @@ def execute options = { max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs min_bytes: @min_bytes, + max_bytes: @max_bytes, topics: topics, } diff --git a/lib/kafka/protocol/fetch_request.rb b/lib/kafka/protocol/fetch_request.rb index 226f7fed4..6f4aa9cf6 100644 --- a/lib/kafka/protocol/fetch_request.rb +++ b/lib/kafka/protocol/fetch_request.rb @@ -19,10 +19,11 @@ class FetchRequest # @param max_wait_time [Integer] # @param min_bytes [Integer] # @param topics [Hash] - def initialize(max_wait_time:, min_bytes:, topics:) + def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) @replica_id = REPLICA_ID @max_wait_time = max_wait_time @min_bytes = min_bytes + @max_bytes = max_bytes @topics = topics end @@ -31,7 +32,7 @@ def api_key end def api_version - 2 + 3 end def response_class @@ -42,6 +43,7 @@ def encode(encoder) encoder.write_int32(@replica_id) encoder.write_int32(@max_wait_time) encoder.write_int32(@min_bytes) + encoder.write_int32(@max_bytes) encoder.write_array(@topics) do |topic, partitions| encoder.write_string(topic) diff --git a/spec/broker_spec.rb b/spec/broker_spec.rb index 54e3fe3b3..57a5f640d 100644 --- a/spec/broker_spec.rb +++ b/spec/broker_spec.rb @@ -87,6 +87,7 @@ def send_request(request) actual_response = broker.fetch_messages( max_wait_time: 0, min_bytes: 0, + max_bytes: 10 * 1024, topics: {} )