Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for offsetdelete #1010

Merged
merged 5 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions offsetdelete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/offsetdelete"
)

// OffsetDelete deletes the offset for a consumer group on a particular topic
// at a particular partition.
rhansen2 marked this conversation as resolved.
Show resolved Hide resolved
type OffsetDelete struct {
Topic string
Partition int
}

// OffsetDeleteRequest represents a request sent to a kafka broker to delete
// the offsets for a partition on a given topic associated with a consumer group.
type OffsetDeleteRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// ID of the consumer group to delete the offsets for.
GroupID string

// Set of topic partitions to delete offsets for.
Topics map[string][]int
}

// OffsetDeleteResponse represents a response from a kafka broker to a delete
// offset request.
type OffsetDeleteResponse struct {
// An error that may have occurred while attempting to delete an offset
Error error

// The amount of time that the broker throttled the request.
Throttle time.Duration

// Set of topic partitions that the kafka broker has additional info (error?)
// for.
Topics map[string][]OffsetDeletePartition
}

// OffsetDeletePartition represents the state of a status of a partition in response
// to deleting offsets.
type OffsetDeletePartition struct {
// ID of the partition.
Partition int

// An error that may have occurred while attempting to delete an offset for
// this partition.
Error error
}

// OffsetDelete sends a delete offset request to a kafka broker and returns the
// response.
func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) {
topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics))

for topicName, partitionIndexes := range req.Topics {
partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes))

for i, c := range partitionIndexes {
partitions[i] = offsetdelete.RequestPartition{
PartitionIndex: int32(c),
}
}

topics = append(topics, offsetdelete.RequestTopic{
Name: topicName,
Partitions: partitions,
})
}

m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{
GroupID: req.GroupID,
Topics: topics,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err)
}
r := m.(*offsetdelete.Response)

res := &OffsetDeleteResponse{
Error: makeError(r.ErrorCode, ""),
Throttle: makeDuration(r.ThrottleTimeMs),
Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)),
}

for _, topic := range r.Topics {
partitions := make([]OffsetDeletePartition, len(topic.Partitions))

for i, p := range topic.Partitions {
partitions[i] = OffsetDeletePartition{
Partition: int(p.PartitionIndex),
Error: makeError(p.ErrorCode, ""),
}
}

res.Topics[topic.Name] = partitions
}

return res, nil
}
160 changes: 160 additions & 0 deletions offsetdelete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package kafka

import (
"context"
"log"
"os"
"strconv"
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientDeleteOffset(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.4.0") {
return
}

topic := makeTopic()
client, shutdown := newLocalClientWithTopic(topic, 3)
defer shutdown()
now := time.Now()

const N = 10 * 3
records := make([]Record, 0, N)
for i := 0; i < N; i++ {
records = append(records, Record{
Time: now,
Value: NewBytes([]byte("test-message-" + strconv.Itoa(i))),
})
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
res, err := client.Produce(ctx, &ProduceRequest{
Topic: topic,
RequiredAcks: RequireAll,
Records: NewRecordReader(records...),
})
if err != nil {
t.Fatal(err)
}

if res.Error != nil {
t.Error(res.Error)
}

for index, err := range res.RecordErrors {
t.Fatalf("record at index %d produced an error: %v", index, err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
groupID := makeGroupID()

group, err := NewConsumerGroup(ConsumerGroupConfig{
ID: groupID,
Topics: []string{topic},
Brokers: []string{"localhost:9092"},
HeartbeatInterval: 2 * time.Second,
RebalanceTimeout: 2 * time.Second,
RetentionTime: time.Hour,
Logger: log.New(os.Stdout, "cg-test: ", 0),
})
if err != nil {
t.Fatal(err)
}

gen, err := group.Next(ctx)
if err != nil {
t.Fatal(err)
}

ocr, err := client.OffsetCommit(ctx, &OffsetCommitRequest{
Addr: nil,
GroupID: groupID,
GenerationID: int(gen.ID),
MemberID: gen.MemberID,
Topics: map[string][]OffsetCommit{
topic: {
{Partition: 0, Offset: 10},
{Partition: 1, Offset: 10},
{Partition: 2, Offset: 10},
},
},
})
if err != nil {
t.Fatal(err)
}

group.Close()

resps := ocr.Topics[topic]
if len(resps) != 3 {
t.Fatalf("expected 3 offsetcommitpartition responses; got %d", len(resps))
}

for _, resp := range resps {
if resp.Error != nil {
t.Fatal(resp.Error)
}
}

ofr, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
GroupID: groupID,
Topics: map[string][]int{topic: {0, 1, 2}},
})
if err != nil {
t.Fatal(err)
}

if ofr.Error != nil {
t.Error(res.Error)
}

fetresps := ofr.Topics[topic]
if len(fetresps) != 3 {
t.Fatalf("expected 3 offsetfetchpartition responses; got %d", len(resps))
}

for _, r := range fetresps {
if r.Error != nil {
t.Fatal(r.Error)
}

if r.CommittedOffset != 10 {
t.Fatalf("expected committed offset to be 10; got: %v for partition: %v", r.CommittedOffset, r.Partition)
}
}

// Remove offsets
odr, err := client.OffsetDelete(ctx, &OffsetDeleteRequest{
GroupID: groupID,
Topics: map[string][]int{topic: {0, 1, 2}},
})
if err != nil {
t.Fatal(err)
}

if odr.Error != nil {
t.Error(odr.Error)
}

// Fetch the offsets again
ofr, err = client.OffsetFetch(ctx, &OffsetFetchRequest{
GroupID: groupID,
Topics: map[string][]int{topic: {0, 1, 2}},
})
if err != nil {
t.Fatal(err)
}

if ofr.Error != nil {
t.Error(res.Error)
}

for _, r := range ofr.Topics[topic] {
if r.CommittedOffset != -1 {
t.Fatalf("expected committed offset to be -1; got: %v for partition: %v", r.CommittedOffset, r.Partition)
}
}
}
47 changes: 47 additions & 0 deletions protocol/offsetdelete/offsetdelete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package offsetdelete

import "github.com/segmentio/kafka-go/protocol"

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
GroupID string `kafka:"min=v0,max=v0"`
Topics []RequestTopic `kafka:"min=v0,max=v0"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetDelete }

func (r *Request) Group() string { return r.GroupID }

type RequestTopic struct {
Name string `kafka:"min=v0,max=v0"`
Partitions []RequestPartition `kafka:"min=v0,max=v0"`
}

type RequestPartition struct {
PartitionIndex int32 `kafka:"min=v0,max=v0"`
}

var (
_ protocol.GroupMessage = (*Request)(nil)
)

type Response struct {
ErrorCode int16 `kafka:"min=v0,max=v0"`
ThrottleTimeMs int32 `kafka:"min=v0,max=v0"`
Topics []ResponseTopic `kafka:"min=v0,max=v0"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.OffsetDelete }

type ResponseTopic struct {
Name string `kafka:"min=v0,max=v0"`
Partitions []ResponsePartition `kafka:"min=v0,max=v0"`
}

type ResponsePartition struct {
PartitionIndex int32 `kafka:"min=v0,max=v0"`
ErrorCode int16 `kafka:"min=v0,max=v0"`
}
29 changes: 29 additions & 0 deletions protocol/offsetdelete/offsetdelete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package offsetdelete_test

import (
"testing"

"github.com/segmentio/kafka-go/protocol/offsetdelete"
"github.com/segmentio/kafka-go/protocol/prototest"
)

func TestOffsetDeleteequest(t *testing.T) {
rhansen2 marked this conversation as resolved.
Show resolved Hide resolved
rhansen2 marked this conversation as resolved.
Show resolved Hide resolved
for _, version := range []int16{0} {
prototest.TestRequest(t, version, &offsetdelete.Request{
GroupID: "group-0",
Topics: []offsetdelete.RequestTopic{
{
Name: "topic-0",
Partitions: []offsetdelete.RequestPartition{
{
PartitionIndex: 0,
},
{
PartitionIndex: 1,
},
},
},
},
})
}
}