Skip to content

Commit

Permalink
Support setting the max bytes to fetch per request
Browse files Browse the repository at this point in the history
  • Loading branch information
dasch committed Nov 2, 2017
1 parent 95e3a21 commit b1a7799
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_by
cluster: @cluster,
logger: @logger,
min_bytes: min_bytes,
max_bytes: max_bytes,
max_wait_time: max_wait_time,
)

Expand Down
6 changes: 4 additions & 2 deletions lib/kafka/fetch_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ module Kafka
# operation.execute
#
class FetchOperation
def initialize(cluster:, logger:, min_bytes: 1, max_wait_time: 5)
def initialize(cluster:, logger:, min_bytes: 1, max_bytes: 10485760, max_wait_time: 5)
@cluster = cluster
@logger = logger
@min_bytes = min_bytes
@max_bytes = max_bytes
@max_wait_time = max_wait_time
@topics = {}
end

def fetch_from_partition(topic, partition, offset: :latest, max_bytes: 1048576)
def fetch_from_partition(topic, partition, offset: :latest, max_bytes: 10485760)
if offset == :earliest
offset = -2
elsif offset == :latest
Expand Down Expand Up @@ -66,6 +67,7 @@ def execute
options = {
max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs
min_bytes: @min_bytes,
max_bytes: @max_bytes,
topics: topics,
}

Expand Down
6 changes: 4 additions & 2 deletions lib/kafka/protocol/fetch_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ class FetchRequest
# @param max_wait_time [Integer]
# @param min_bytes [Integer]
# @param topics [Hash]
def initialize(max_wait_time:, min_bytes:, topics:)
def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:)
@replica_id = REPLICA_ID
@max_wait_time = max_wait_time
@min_bytes = min_bytes
@max_bytes = max_bytes
@topics = topics
end

Expand All @@ -31,7 +32,7 @@ def api_key
end

def api_version
2
3
end

def response_class
Expand All @@ -42,6 +43,7 @@ def encode(encoder)
encoder.write_int32(@replica_id)
encoder.write_int32(@max_wait_time)
encoder.write_int32(@min_bytes)
encoder.write_int32(@max_bytes)

encoder.write_array(@topics) do |topic, partitions|
encoder.write_string(topic)
Expand Down
1 change: 1 addition & 0 deletions spec/broker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def send_request(request)
actual_response = broker.fetch_messages(
max_wait_time: 0,
min_bytes: 0,
max_bytes: 10 * 1024,
topics: {}
)

Expand Down

0 comments on commit b1a7799

Please sign in to comment.