Skip to content

Commit

Permalink
Added support for FetchRequest protocol version 3.
Browse files Browse the repository at this point in the history
This commit will add the 'MaxBytes' field to FetchRequests when the Kafka version is
0.10.1 or better. On send, it will be set to MaxResponseSize. No tests, yet.
  • Loading branch information
Michael Herstine committed Jun 15, 2017
1 parent ac03dfa commit 4d1eceb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
4 changes: 4 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 3
request.MaxBytes = MaxResponseSize - 47 // TODO(mherstin): WTF?!
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
Expand Down
12 changes: 12 additions & 0 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
return nil
}

// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
blocks map[string]map[int32]*fetchRequestBlock
}
Expand All @@ -32,6 +36,9 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
pe.putInt32(-1) // replica ID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
pe.putInt32(r.MinBytes)
if 3 == r.Version {
pe.putInt32(r.MaxBytes)
}
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
Expand Down Expand Up @@ -67,6 +74,11 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
if r.MinBytes, err = pd.getInt32(); err != nil {
return err
}
if r.Version == 3 {
if r.MaxBytes, err = pd.getInt32(); err != nil {
return err
}
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
Expand Down

0 comments on commit 4d1eceb

Please sign in to comment.