Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offsetfetch request topics are now nullable #1162

Merged
merged 13 commits into from
Jul 18, 2023

Conversation

amortezaei
Copy link
Contributor

@amortezaei amortezaei commented Jul 12, 2023

Issue

#1115

Fix

made the "Topics" filed of offsetfetch's Request (https://github.com/segmentio/kafka-go/blob/main/protocol/offsetfetch/offsetfetch.go#L11) nullable in order to make it compliant with Kafka API specs

Testing

Setup:

P1 ------> [T1] ------> {CG1} -------> C1
P12------> [T2] ------> {CG1} -------> C2

offset-test-setup

Test 1 driver


import (
	"context"
	"fmt"

	"github.com/segmentio/kafka-go"
)

func main() {
	fmt.Printf("START\n")
	ctx := context.Background()
	client := kafka.Client{Addr: kafka.TCP("localhost:9092")}
	topicOffsets, err := client.OffsetFetch(ctx, &kafka.OffsetFetchRequest{GroupID: "cg1"})
	if err != nil {
		fmt.Printf("ERROR: %s\n", err.Error())
	}
	fmt.Printf("number of topics %d\n", len(topicOffsets.Topics))
	for topic, offsets := range topicOffsets.Topics {
		fmt.Printf("###########################\n")
		fmt.Printf("offset for topic %s\n", topic)
		for _, offset := range offsets {
			fmt.Printf("\t###########################\n")
			fmt.Printf("\toffset.Partition=%d\n", offset.Partition)
			fmt.Printf("\toffset.CommittedOffset=%d\n", offset.CommittedOffset)
			fmt.Printf("\toffset.Metadata=%s\n", offset.Metadata)
		}
	}

}

output:

START
number of topics 2
###########################
offset for topic topic-secondary
	###########################
	offset.Partition=5
	offset.CommittedOffset=1349
	offset.Metadata=
	###########################
	offset.Partition=2
	offset.CommittedOffset=912
	offset.Metadata=
	###########################
	offset.Partition=4
	offset.CommittedOffset=228
	offset.Metadata=
	###########################
	offset.Partition=1
	offset.CommittedOffset=685
	offset.Metadata=
	###########################
	offset.Partition=0
	offset.CommittedOffset=687
	offset.Metadata=
	###########################
	offset.Partition=3
	offset.CommittedOffset=686
	offset.Metadata=
###########################
offset for topic topic-primary
	###########################
	offset.Partition=2
	offset.CommittedOffset=525
	offset.Metadata=
	###########################
	offset.Partition=5
	offset.CommittedOffset=1143
	offset.Metadata=
	###########################
	offset.Partition=3
	offset.CommittedOffset=684
	offset.Metadata=
	###########################
	offset.Partition=4
	offset.CommittedOffset=457
	offset.Metadata=
	###########################
	offset.Partition=1
	offset.CommittedOffset=913
	offset.Metadata=
	###########################
	offset.Partition=0
	offset.CommittedOffset=1502
	offset.Metadata=

Test 2 driver

package main

import (
	"context"
	"fmt"

	"github.com/segmentio/kafka-go"
)

func main() {
	fmt.Printf("START\n")
	ctx := context.Background()
	client := kafka.Client{Addr: kafka.TCP("localhost:9092")}
	topics := map[string][]int{
		"topic-primary": {1, 2, 3},
	}
	topicOffsets, err := client.OffsetFetch(ctx, &kafka.OffsetFetchRequest{GroupID: "cg1", Topics: topics})
	if err != nil {
		fmt.Printf("ERROR: %s\n", err.Error())
	}
	fmt.Printf("number of topics %d\n", len(topicOffsets.Topics))
	for topic, offsets := range topicOffsets.Topics {
		fmt.Printf("###########################\n")
		fmt.Printf("offset for topic %s\n", topic)
		for _, offset := range offsets {
			fmt.Printf("\t###########################\n")
			fmt.Printf("\toffset.Partition=%d\n", offset.Partition)
			fmt.Printf("\toffset.CommittedOffset=%d\n", offset.CommittedOffset)
			fmt.Printf("\toffset.Metadata=%s\n", offset.Metadata)
		}
	}

}

output:

START
number of topics 1
###########################
offset for topic topic-primary
	###########################
	offset.Partition=1
	offset.CommittedOffset=1360
	offset.Metadata=
	###########################
	offset.Partition=2
	offset.CommittedOffset=525
	offset.Metadata=
	###########################
	offset.Partition=3
	offset.CommittedOffset=912
	offset.Metadata=

offsetfetch.go Outdated Show resolved Hide resolved
@amortezaei amortezaei requested a review from hhahn-tw July 17, 2023 17:24
offsetfetch.go Outdated Show resolved Hide resolved
@amortezaei amortezaei merged commit 8ceaf94 into main Jul 18, 2023
@amortezaei amortezaei deleted the DP-1586-fetch-offset-no-topic branch July 18, 2023 00:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants