Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offsetfetch request topics are now nullable #1162

Merged
merged 13 commits into from
Jul 18, 2023
23 changes: 19 additions & 4 deletions offsetfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,25 @@ func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*Off
})
}

m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{
GroupID: req.GroupID,
Topics: topics,
})
var offsetFetchReq *offsetfetch.Request
petedannemann marked this conversation as resolved.
Show resolved Hide resolved

if len(req.Topics) < 1 {
amortezaei marked this conversation as resolved.
Show resolved Hide resolved
// Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API
// which will return the result for all topics with the desired consumer group:
// https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch
// For Kafka version below 0.10.2.x this call will result in an error

offsetFetchReq = &offsetfetch.Request{
GroupID: req.GroupID,
}
} else {
offsetFetchReq = &offsetfetch.Request{
GroupID: req.GroupID,
Topics: topics,
}
}

m, err := c.roundTrip(ctx, req.Addr, offsetFetchReq)

if err != nil {
return nil, fmt.Errorf("kafka.(*Client).OffsetFetch: %w", err)
Expand Down
122 changes: 122 additions & 0 deletions offsetfetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package kafka
import (
"bufio"
"bytes"
"context"
"reflect"
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestOffsetFetchResponseV1(t *testing.T) {
Expand Down Expand Up @@ -43,3 +47,121 @@ func TestOffsetFetchResponseV1(t *testing.T) {
t.FailNow()
}
}

func TestOffsetFetchRequestWithNoTopic(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.10.2.0") {
t.Logf("Test %s is not applicable for kafka versions below 0.10.2.0", t.Name())
t.SkipNow()
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
topic1 := makeTopic()
defer deleteTopic(t, topic1)
topic2 := makeTopic()
defer deleteTopic(t, topic2)
consumeGroup := makeGroupID()
numMsgs := 50
defer cancel()
r1 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic1,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r1.Close()
prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
r2 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic2,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r2.Close()
prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)

for i := 0; i < numMsgs; i++ {
if _, err := r1.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}
for i := 0; i < numMsgs; i++ {
if _, err := r2.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}

client := Client{Addr: TCP("localhost:9092")}

topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup})

if err != nil {
t.Error(err)
t.FailNow()
}

if len(topicOffsets.Topics) != 2 {
t.Error(err)
t.FailNow()
}

}

func TestOffsetFetchRequestWithOneTopic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
topic1 := makeTopic()
defer deleteTopic(t, topic1)
topic2 := makeTopic()
defer deleteTopic(t, topic2)
consumeGroup := makeGroupID()
numMsgs := 50
defer cancel()
r1 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic1,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r1.Close()
prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
r2 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic2,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r2.Close()
prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)

for i := 0; i < numMsgs; i++ {
if _, err := r1.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}
for i := 0; i < numMsgs; i++ {
if _, err := r2.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}

client := Client{Addr: TCP("localhost:9092")}
topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup, Topics: map[string][]int{
topic1: {0},
}})

if err != nil {
t.Error(err)
t.FailNow()
}

if len(topicOffsets.Topics) != 1 {
t.Error(err)
t.FailNow()
}
}
2 changes: 1 addition & 1 deletion protocol/offsetfetch/offsetfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func init() {

type Request struct {
GroupID string `kafka:"min=v0,max=v5"`
Topics []RequestTopic `kafka:"min=v0,max=v5"`
Topics []RequestTopic `kafka:"min=v0,max=v5,nullable"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetFetch }
Expand Down