diff --git a/offsetdelete.go b/offsetdelete.go new file mode 100644 index 000000000..ea526eb25 --- /dev/null +++ b/offsetdelete.go @@ -0,0 +1,106 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/offsetdelete" +) + +// OffsetDelete deletes the offset for a consumer group on a particular topic +// for a particular partition. +type OffsetDelete struct { + Topic string + Partition int +} + +// OffsetDeleteRequest represents a request sent to a kafka broker to delete +// the offsets for a partition on a given topic associated with a consumer group. +type OffsetDeleteRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // ID of the consumer group to delete the offsets for. + GroupID string + + // Set of topic partitions to delete offsets for. + Topics map[string][]int +} + +// OffsetDeleteResponse represents a response from a kafka broker to a delete +// offset request. +type OffsetDeleteResponse struct { + // An error that may have occurred while attempting to delete an offset + Error error + + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Set of topic partitions that the kafka broker has additional info (error?) + // for. + Topics map[string][]OffsetDeletePartition +} + +// OffsetDeletePartition represents the state of a status of a partition in response +// to deleting offsets. +type OffsetDeletePartition struct { + // ID of the partition. + Partition int + + // An error that may have occurred while attempting to delete an offset for + // this partition. + Error error +} + +// OffsetDelete sends a delete offset request to a kafka broker and returns the +// response. +func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) { + topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics)) + + for topicName, partitionIndexes := range req.Topics { + partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes)) + + for i, c := range partitionIndexes { + partitions[i] = offsetdelete.RequestPartition{ + PartitionIndex: int32(c), + } + } + + topics = append(topics, offsetdelete.RequestTopic{ + Name: topicName, + Partitions: partitions, + }) + } + + m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{ + GroupID: req.GroupID, + Topics: topics, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err) + } + r := m.(*offsetdelete.Response) + + res := &OffsetDeleteResponse{ + Error: makeError(r.ErrorCode, ""), + Throttle: makeDuration(r.ThrottleTimeMs), + Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)), + } + + for _, topic := range r.Topics { + partitions := make([]OffsetDeletePartition, len(topic.Partitions)) + + for i, p := range topic.Partitions { + partitions[i] = OffsetDeletePartition{ + Partition: int(p.PartitionIndex), + Error: makeError(p.ErrorCode, ""), + } + } + + res.Topics[topic.Name] = partitions + } + + return res, nil +} diff --git a/offsetdelete_test.go b/offsetdelete_test.go new file mode 100644 index 000000000..e66b38fd3 --- /dev/null +++ b/offsetdelete_test.go @@ -0,0 +1,160 @@ +package kafka + +import ( + "context" + "log" + "os" + "strconv" + "testing" + "time" + + ktesting "github.com/segmentio/kafka-go/testing" +) + +func TestClientDeleteOffset(t *testing.T) { + if !ktesting.KafkaIsAtLeast("2.4.0") { + return + } + + topic := makeTopic() + client, shutdown := newLocalClientWithTopic(topic, 3) + defer shutdown() + now := time.Now() + + const N = 10 * 3 + records := make([]Record, 0, N) + for i := 0; i < N; i++ { + records = append(records, Record{ + Time: now, + Value: NewBytes([]byte("test-message-" + strconv.Itoa(i))), + }) + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + res, err := client.Produce(ctx, &ProduceRequest{ + Topic: topic, + RequiredAcks: RequireAll, + Records: NewRecordReader(records...), + }) + if err != nil { + t.Fatal(err) + } + + if res.Error != nil { + t.Error(res.Error) + } + + for index, err := range res.RecordErrors { + t.Fatalf("record at index %d produced an error: %v", index, err) + } + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupID := makeGroupID() + + group, err := NewConsumerGroup(ConsumerGroupConfig{ + ID: groupID, + Topics: []string{topic}, + Brokers: []string{"localhost:9092"}, + HeartbeatInterval: 2 * time.Second, + RebalanceTimeout: 2 * time.Second, + RetentionTime: time.Hour, + Logger: log.New(os.Stdout, "cg-test: ", 0), + }) + if err != nil { + t.Fatal(err) + } + + gen, err := group.Next(ctx) + if err != nil { + t.Fatal(err) + } + + ocr, err := client.OffsetCommit(ctx, &OffsetCommitRequest{ + Addr: nil, + GroupID: groupID, + GenerationID: int(gen.ID), + MemberID: gen.MemberID, + Topics: map[string][]OffsetCommit{ + topic: { + {Partition: 0, Offset: 10}, + {Partition: 1, Offset: 10}, + {Partition: 2, Offset: 10}, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + group.Close() + + resps := ocr.Topics[topic] + if len(resps) != 3 { + t.Fatalf("expected 3 offsetcommitpartition responses; got %d", len(resps)) + } + + for _, resp := range resps { + if resp.Error != nil { + t.Fatal(resp.Error) + } + } + + ofr, err := client.OffsetFetch(ctx, &OffsetFetchRequest{ + GroupID: groupID, + Topics: map[string][]int{topic: {0, 1, 2}}, + }) + if err != nil { + t.Fatal(err) + } + + if ofr.Error != nil { + t.Error(res.Error) + } + + fetresps := ofr.Topics[topic] + if len(fetresps) != 3 { + t.Fatalf("expected 3 offsetfetchpartition responses; got %d", len(resps)) + } + + for _, r := range fetresps { + if r.Error != nil { + t.Fatal(r.Error) + } + + if r.CommittedOffset != 10 { + t.Fatalf("expected committed offset to be 10; got: %v for partition: %v", r.CommittedOffset, r.Partition) + } + } + + // Remove offsets + odr, err := client.OffsetDelete(ctx, &OffsetDeleteRequest{ + GroupID: groupID, + Topics: map[string][]int{topic: {0, 1, 2}}, + }) + if err != nil { + t.Fatal(err) + } + + if odr.Error != nil { + t.Error(odr.Error) + } + + // Fetch the offsets again + ofr, err = client.OffsetFetch(ctx, &OffsetFetchRequest{ + GroupID: groupID, + Topics: map[string][]int{topic: {0, 1, 2}}, + }) + if err != nil { + t.Fatal(err) + } + + if ofr.Error != nil { + t.Error(res.Error) + } + + for _, r := range ofr.Topics[topic] { + if r.CommittedOffset != -1 { + t.Fatalf("expected committed offset to be -1; got: %v for partition: %v", r.CommittedOffset, r.Partition) + } + } +} diff --git a/protocol/offsetdelete/offsetdelete.go b/protocol/offsetdelete/offsetdelete.go new file mode 100644 index 000000000..bda619f3c --- /dev/null +++ b/protocol/offsetdelete/offsetdelete.go @@ -0,0 +1,47 @@ +package offsetdelete + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + GroupID string `kafka:"min=v0,max=v0"` + Topics []RequestTopic `kafka:"min=v0,max=v0"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetDelete } + +func (r *Request) Group() string { return r.GroupID } + +type RequestTopic struct { + Name string `kafka:"min=v0,max=v0"` + Partitions []RequestPartition `kafka:"min=v0,max=v0"` +} + +type RequestPartition struct { + PartitionIndex int32 `kafka:"min=v0,max=v0"` +} + +var ( + _ protocol.GroupMessage = (*Request)(nil) +) + +type Response struct { + ErrorCode int16 `kafka:"min=v0,max=v0"` + ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` + Topics []ResponseTopic `kafka:"min=v0,max=v0"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.OffsetDelete } + +type ResponseTopic struct { + Name string `kafka:"min=v0,max=v0"` + Partitions []ResponsePartition `kafka:"min=v0,max=v0"` +} + +type ResponsePartition struct { + PartitionIndex int32 `kafka:"min=v0,max=v0"` + ErrorCode int16 `kafka:"min=v0,max=v0"` +} diff --git a/protocol/offsetdelete/offsetdelete_test.go b/protocol/offsetdelete/offsetdelete_test.go new file mode 100644 index 000000000..ed8d7ea94 --- /dev/null +++ b/protocol/offsetdelete/offsetdelete_test.go @@ -0,0 +1,52 @@ +package offsetdelete_test + +import ( + "testing" + + "github.com/segmentio/kafka-go/protocol/offsetdelete" + "github.com/segmentio/kafka-go/protocol/prototest" +) + +func TestOffsetDeleteRequest(t *testing.T) { + for _, version := range []int16{0} { + prototest.TestRequest(t, version, &offsetdelete.Request{ + GroupID: "group-0", + Topics: []offsetdelete.RequestTopic{ + { + Name: "topic-0", + Partitions: []offsetdelete.RequestPartition{ + { + PartitionIndex: 0, + }, + { + PartitionIndex: 1, + }, + }, + }, + }, + }) + } +} + +func TestOffsetDeleteResponse(t *testing.T) { + for _, version := range []int16{0} { + prototest.TestResponse(t, version, &offsetdelete.Response{ + ErrorCode: 0, + Topics: []offsetdelete.ResponseTopic{ + { + Name: "topic-0", + Partitions: []offsetdelete.ResponsePartition{ + { + PartitionIndex: 0, + ErrorCode: 1, + }, + { + PartitionIndex: 1, + ErrorCode: 1, + }, + }, + }, + }, + }) + } +}