Skip to content

Commit

Permalink
offsetfetch request topics are now nullable (#1162)
Browse files Browse the repository at this point in the history
* offsetfetch request topics are now nullable

* new unit tests for offsetfetch was added
  • Loading branch information
amortezaei authored Jul 18, 2023
1 parent c293a8c commit 8ceaf94
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 11 deletions.
29 changes: 19 additions & 10 deletions offsetfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,28 @@ type OffsetFetchPartition struct {
// OffsetFetch sends an offset fetch request to a kafka broker and returns the
// response.
func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) {
topics := make([]offsetfetch.RequestTopic, 0, len(req.Topics))

for topicName, partitions := range req.Topics {
indexes := make([]int32, len(partitions))
// Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API
// which will return the result for all topics with the desired consumer group:
// https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch
// For Kafka version below 0.10.2.x this call will result in an error
var topics []offsetfetch.RequestTopic

for i, p := range partitions {
indexes[i] = int32(p)
}
if len(req.Topics) > 0 {
topics = make([]offsetfetch.RequestTopic, 0, len(req.Topics))

for topicName, partitions := range req.Topics {
indexes := make([]int32, len(partitions))

topics = append(topics, offsetfetch.RequestTopic{
Name: topicName,
PartitionIndexes: indexes,
})
for i, p := range partitions {
indexes[i] = int32(p)
}

topics = append(topics, offsetfetch.RequestTopic{
Name: topicName,
PartitionIndexes: indexes,
})
}
}

m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{
Expand Down
122 changes: 122 additions & 0 deletions offsetfetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package kafka
import (
"bufio"
"bytes"
"context"
"reflect"
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestOffsetFetchResponseV1(t *testing.T) {
Expand Down Expand Up @@ -43,3 +47,121 @@ func TestOffsetFetchResponseV1(t *testing.T) {
t.FailNow()
}
}

func TestOffsetFetchRequestWithNoTopic(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.10.2.0") {
t.Logf("Test %s is not applicable for kafka versions below 0.10.2.0", t.Name())
t.SkipNow()
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
topic1 := makeTopic()
defer deleteTopic(t, topic1)
topic2 := makeTopic()
defer deleteTopic(t, topic2)
consumeGroup := makeGroupID()
numMsgs := 50
defer cancel()
r1 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic1,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r1.Close()
prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
r2 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic2,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r2.Close()
prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)

for i := 0; i < numMsgs; i++ {
if _, err := r1.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}
for i := 0; i < numMsgs; i++ {
if _, err := r2.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}

client := Client{Addr: TCP("localhost:9092")}

topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup})

if err != nil {
t.Error(err)
t.FailNow()
}

if len(topicOffsets.Topics) != 2 {
t.Error(err)
t.FailNow()
}

}

func TestOffsetFetchRequestWithOneTopic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
topic1 := makeTopic()
defer deleteTopic(t, topic1)
topic2 := makeTopic()
defer deleteTopic(t, topic2)
consumeGroup := makeGroupID()
numMsgs := 50
defer cancel()
r1 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic1,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r1.Close()
prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
r2 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic2,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r2.Close()
prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)

for i := 0; i < numMsgs; i++ {
if _, err := r1.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}
for i := 0; i < numMsgs; i++ {
if _, err := r2.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}

client := Client{Addr: TCP("localhost:9092")}
topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup, Topics: map[string][]int{
topic1: {0},
}})

if err != nil {
t.Error(err)
t.FailNow()
}

if len(topicOffsets.Topics) != 1 {
t.Error(err)
t.FailNow()
}
}
2 changes: 1 addition & 1 deletion protocol/offsetfetch/offsetfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func init() {

type Request struct {
GroupID string `kafka:"min=v0,max=v5"`
Topics []RequestTopic `kafka:"min=v0,max=v5"`
Topics []RequestTopic `kafka:"min=v0,max=v5,nullable"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetFetch }
Expand Down

0 comments on commit 8ceaf94

Please sign in to comment.