diff --git a/offsetfetch.go b/offsetfetch.go index 61fcba2e3..b85bc5c83 100644 --- a/offsetfetch.go +++ b/offsetfetch.go @@ -66,19 +66,28 @@ type OffsetFetchPartition struct { // OffsetFetch sends an offset fetch request to a kafka broker and returns the // response. func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) { - topics := make([]offsetfetch.RequestTopic, 0, len(req.Topics)) - for topicName, partitions := range req.Topics { - indexes := make([]int32, len(partitions)) + // Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API + // which will return the result for all topics with the desired consumer group: + // https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch + // For Kafka version below 0.10.2.x this call will result in an error + var topics []offsetfetch.RequestTopic - for i, p := range partitions { - indexes[i] = int32(p) - } + if len(req.Topics) > 0 { + topics = make([]offsetfetch.RequestTopic, 0, len(req.Topics)) + + for topicName, partitions := range req.Topics { + indexes := make([]int32, len(partitions)) - topics = append(topics, offsetfetch.RequestTopic{ - Name: topicName, - PartitionIndexes: indexes, - }) + for i, p := range partitions { + indexes[i] = int32(p) + } + + topics = append(topics, offsetfetch.RequestTopic{ + Name: topicName, + PartitionIndexes: indexes, + }) + } } m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{ diff --git a/offsetfetch_test.go b/offsetfetch_test.go index 6edb7dfaf..7f244700d 100644 --- a/offsetfetch_test.go +++ b/offsetfetch_test.go @@ -3,8 +3,12 @@ package kafka import ( "bufio" "bytes" + "context" "reflect" "testing" + "time" + + ktesting "github.com/segmentio/kafka-go/testing" ) func TestOffsetFetchResponseV1(t *testing.T) { @@ -43,3 +47,121 @@ func TestOffsetFetchResponseV1(t *testing.T) { t.FailNow() } } + +func TestOffsetFetchRequestWithNoTopic(t *testing.T) { + if !ktesting.KafkaIsAtLeast("0.10.2.0") { + t.Logf("Test %s is not applicable for kafka versions below 0.10.2.0", t.Name()) + t.SkipNow() + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + topic1 := makeTopic() + defer deleteTopic(t, topic1) + topic2 := makeTopic() + defer deleteTopic(t, topic2) + consumeGroup := makeGroupID() + numMsgs := 50 + defer cancel() + r1 := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: topic1, + GroupID: consumeGroup, + MinBytes: 1, + MaxBytes: 100, + MaxWait: 100 * time.Millisecond, + }) + defer r1.Close() + prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...) + r2 := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: topic2, + GroupID: consumeGroup, + MinBytes: 1, + MaxBytes: 100, + MaxWait: 100 * time.Millisecond, + }) + defer r2.Close() + prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...) + + for i := 0; i < numMsgs; i++ { + if _, err := r1.ReadMessage(ctx); err != nil { + t.Fatal(err) + } + } + for i := 0; i < numMsgs; i++ { + if _, err := r2.ReadMessage(ctx); err != nil { + t.Fatal(err) + } + } + + client := Client{Addr: TCP("localhost:9092")} + + topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup}) + + if err != nil { + t.Error(err) + t.FailNow() + } + + if len(topicOffsets.Topics) != 2 { + t.Error(err) + t.FailNow() + } + +} + +func TestOffsetFetchRequestWithOneTopic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + topic1 := makeTopic() + defer deleteTopic(t, topic1) + topic2 := makeTopic() + defer deleteTopic(t, topic2) + consumeGroup := makeGroupID() + numMsgs := 50 + defer cancel() + r1 := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: topic1, + GroupID: consumeGroup, + MinBytes: 1, + MaxBytes: 100, + MaxWait: 100 * time.Millisecond, + }) + defer r1.Close() + prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...) + r2 := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: topic2, + GroupID: consumeGroup, + MinBytes: 1, + MaxBytes: 100, + MaxWait: 100 * time.Millisecond, + }) + defer r2.Close() + prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...) + + for i := 0; i < numMsgs; i++ { + if _, err := r1.ReadMessage(ctx); err != nil { + t.Fatal(err) + } + } + for i := 0; i < numMsgs; i++ { + if _, err := r2.ReadMessage(ctx); err != nil { + t.Fatal(err) + } + } + + client := Client{Addr: TCP("localhost:9092")} + topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup, Topics: map[string][]int{ + topic1: {0}, + }}) + + if err != nil { + t.Error(err) + t.FailNow() + } + + if len(topicOffsets.Topics) != 1 { + t.Error(err) + t.FailNow() + } +} diff --git a/protocol/offsetfetch/offsetfetch.go b/protocol/offsetfetch/offsetfetch.go index 011003340..8f1096f5d 100644 --- a/protocol/offsetfetch/offsetfetch.go +++ b/protocol/offsetfetch/offsetfetch.go @@ -8,7 +8,7 @@ func init() { type Request struct { GroupID string `kafka:"min=v0,max=v5"` - Topics []RequestTopic `kafka:"min=v0,max=v5"` + Topics []RequestTopic `kafka:"min=v0,max=v5,nullable"` } func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetFetch }