From b61507f8345e89989d1f521155266db567b14590 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Sun, 12 Sep 2021 19:18:54 -0700 Subject: [PATCH] Add AddPartitionsToTxn and EndTxn functions to client. --- addpartitionstotxn.go | 108 +++++++++++++++ addpartitionstotxn_test.go | 126 ++++++++++++++++++ client_test.go | 11 ++ conn_test.go | 18 +-- endtxn.go | 61 +++++++++ initproducerid_test.go | 6 +- .../addpartitionstotxn/addpartitionstotxn.go | 58 ++++++++ .../addpartitionstotxn_test.go | 60 +++++++++ protocol/endtxn/endtxn.go | 31 +++++ protocol/endtxn/endtxn_test.go | 28 ++++ 10 files changed, 497 insertions(+), 10 deletions(-) create mode 100644 addpartitionstotxn.go create mode 100644 addpartitionstotxn_test.go create mode 100644 endtxn.go create mode 100644 protocol/addpartitionstotxn/addpartitionstotxn.go create mode 100644 protocol/addpartitionstotxn/addpartitionstotxn_test.go create mode 100644 protocol/endtxn/endtxn.go create mode 100644 protocol/endtxn/endtxn_test.go diff --git a/addpartitionstotxn.go b/addpartitionstotxn.go new file mode 100644 index 000000000..5931aa0c0 --- /dev/null +++ b/addpartitionstotxn.go @@ -0,0 +1,108 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/addpartitionstotxn" +) + +// AddPartitionToTxn represents a partition to be added +// to a transaction. +type AddPartitionToTxn struct { + // Partition is the ID of a partition to add to the transaction. + Partition int +} + +// AddPartitionsToTxnRequest is the request structure fo the AddPartitionsToTxn function. +type AddPartitionsToTxnRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // The transactional id key + TransactionalID string + + // The Producer ID (PID) for the current producer session; + // received from an InitProducerID request. + ProducerID int + + // The epoch associated with the current producer session for the given PID + ProducerEpoch int + + // Mappings of topic names to lists of partitions. + Topics map[string][]AddPartitionToTxn +} + +// AddPartitioinsToTxnResponse is the response structure for the AddPartitioinsToTxn function. +type AddPartitioinsToTxnResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Mappings of topic names to partitions being added to a transactions. + Topics map[string][]AddPartitionToTxnPartition +} + +// AddPartitionToTxnPartition represents the state of a single partition +// in response to adding to a transaction. +type AddPartitionToTxnPartition struct { + // The ID of the partition. + Partition int + + // An error that may have occured when attempting to add the partition + // to a transaction. + // + // The errors contain the kafka error code. Programs may use the standard + // errors.Is function to test the error against kafka error codes. + Error error +} + +// AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response. +func (c *Client) AddPartitionsToTxn( + ctx context.Context, + req *AddPartitionsToTxnRequest, +) (*AddPartitioinsToTxnResponse, error) { + protoReq := &addpartitionstotxn.Request{ + TransactionalID: req.TransactionalID, + ProducerID: int64(req.ProducerID), + ProducerEpoch: int16(req.ProducerEpoch), + } + protoReq.Topics = make([]addpartitionstotxn.RequestTopic, 0, len(req.Topics)) + + for topic, partitions := range req.Topics { + reqTopic := addpartitionstotxn.RequestTopic{ + Name: topic, + Partitions: make([]int32, len(partitions)), + } + for i, partition := range partitions { + reqTopic.Partitions[i] = int32(partition.Partition) + } + protoReq.Topics = append(protoReq.Topics, reqTopic) + } + + m, err := c.roundTrip(ctx, req.Addr, protoReq) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).AddPartitionsToTxn: %w", err) + } + + r := m.(*addpartitionstotxn.Response) + + res := &AddPartitioinsToTxnResponse{ + Throttle: makeDuration(r.ThrottleTimeMs), + Topics: make(map[string][]AddPartitionToTxnPartition, len(r.Results)), + } + + for _, result := range r.Results { + partitions := make([]AddPartitionToTxnPartition, 0, len(result.Results)) + for _, rp := range result.Results { + partitions = append(partitions, AddPartitionToTxnPartition{ + Partition: int(rp.PartitionIndex), + Error: makeError(rp.ErrorCode, ""), + }) + } + res.Topics[result.Name] = partitions + } + + return res, nil +} diff --git a/addpartitionstotxn_test.go b/addpartitionstotxn_test.go new file mode 100644 index 000000000..ba1d7fbc0 --- /dev/null +++ b/addpartitionstotxn_test.go @@ -0,0 +1,126 @@ +package kafka + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + ktesting "github.com/segmentio/kafka-go/testing" +) + +func TestClientAddPartitionsToTxn(t *testing.T) { + if !ktesting.KafkaIsAtLeast("0.11.0") { + t.Skip("Skipping test because kafka version is not high enough.") + } + topic1 := makeTopic() + topic2 := makeTopic() + + client, shutdown := newLocalClient() + defer shutdown() + + err := clientCreateTopic(client, topic1, 3) + if err != nil { + t.Fatal(err) + } + + err = clientCreateTopic(client, topic2, 3) + if err != nil { + t.Fatal(err) + } + + transactionalID := makeTransactionalID() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{ + Addr: client.Addr, + Key: transactionalID, + KeyType: CoordinatorKeyTypeTransaction, + }) + if err != nil { + t.Fatal(err) + } + + transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port)))) + client, shutdown = newClient(transactionCoordinator) + defer shutdown() + + ipResp, err := client.InitProducerID(ctx, &InitProducerIDRequest{ + TransactionalID: transactionalID, + TransactionTimeoutMs: 10000, + }) + if err != nil { + t.Fatal(err) + } + + if ipResp.Error != nil { + t.Fatal(ipResp.Error) + } + + defer func() { + err := clientEndTxn(client, &EndTxnRequest{ + TransactionalID: transactionalID, + ProducerID: ipResp.Producer.ProducerID, + ProducerEpoch: ipResp.Producer.ProducerID, + Committed: false, + }) + if err != nil { + t.Fatal(err) + } + }() + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + resp, err := client.AddPartitionsToTxn(ctx, &AddPartitionsToTxnRequest{ + TransactionalID: transactionalID, + ProducerID: ipResp.Producer.ProducerID, + ProducerEpoch: ipResp.Producer.ProducerEpoch, + Topics: map[string][]AddPartitionToTxn{ + topic1: { + { + Partition: 0, + }, + { + Partition: 1, + }, + { + Partition: 2, + }, + }, + topic2: { + { + Partition: 0, + }, + { + Partition: 2, + }, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + if len(resp.Topics) != 2 { + t.Errorf("expected responses for 2 topics; got: %d", len(resp.Topics)) + } + for topic, partitions := range resp.Topics { + if topic == topic1 { + if len(partitions) != 3 { + t.Errorf("expected 3 partitions in response for topic %s; got: %d", topic, len(partitions)) + } + } + if topic == topic2 { + if len(partitions) != 2 { + t.Errorf("expected 2 partitions in response for topic %s; got: %d", topic, len(partitions)) + } + } + for _, partition := range partitions { + if partition.Error != nil { + t.Error(partition.Error) + } + } + } +} diff --git a/client_test.go b/client_test.go index e971d4efb..5fd8422c2 100644 --- a/client_test.go +++ b/client_test.go @@ -67,6 +67,17 @@ func clientCreateTopic(client *Client, topic string, partitions int) error { return nil } +func clientEndTxn(client *Client, req *EndTxnRequest) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + resp, err := client.EndTxn(ctx, req) + if err != nil { + return err + } + + return resp.Error +} + func newLocalClient() (*Client, func()) { return newClient(TCP("localhost")) } diff --git a/conn_test.go b/conn_test.go index 824d804ef..d1477e379 100644 --- a/conn_test.go +++ b/conn_test.go @@ -105,6 +105,10 @@ func makeGroupID() string { return fmt.Sprintf("kafka-go-group-%016x", rand.Int63()) } +func makeTransactionalID() string { + return fmt.Sprintf("kafka-go-transactional-id-%016x", rand.Int63()) +} + func TestConn(t *testing.T) { tests := []struct { scenario string @@ -324,13 +328,13 @@ func TestConn(t *testing.T) { t.Parallel() nettest.TestConn(t, func() (c1 net.Conn, c2 net.Conn, stop func(), err error) { - var topic1 = makeTopic() - var topic2 = makeTopic() + topic1 := makeTopic() + topic2 := makeTopic() var t1Reader *Conn var t2Reader *Conn var t1Writer *Conn var t2Writer *Conn - var dialer = &Dialer{} + dialer := &Dialer{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -378,7 +382,6 @@ func testConnFirstOffset(t *testing.T, conn *Conn) { func testConnWrite(t *testing.T, conn *Conn) { b := []byte("Hello World!") n, err := conn.Write(b) - if err != nil { t.Error(err) } @@ -952,11 +955,10 @@ func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) { } func testConnWriteReadConcurrently(t *testing.T, conn *Conn) { - const N = 1000 - var msgs = make([]string, N) - var done = make(chan struct{}) - var written = make(chan struct{}, N/10) + msgs := make([]string, N) + done := make(chan struct{}) + written := make(chan struct{}, N/10) for i := 0; i != N; i++ { msgs[i] = strconv.Itoa(i) diff --git a/endtxn.go b/endtxn.go new file mode 100644 index 000000000..300dc1797 --- /dev/null +++ b/endtxn.go @@ -0,0 +1,61 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/endtxn" +) + +// EndTxnRequest represets a request sent to a kafka broker to end a transaction. +type EndTxnRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // The transactional id key. + TransactionalID string + + // The Producer ID (PID) for the current producer session + ProducerID int + + // The epoch associated with the current producer session for the given PID + ProducerEpoch int + + // Committed should be set to true if the transaction was commited, false otherwise. + Committed bool +} + +// EndTxnResponse represents a resposne from a kafka broker to an end transaction request. +type EndTxnResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Error is non-nil if an error occureda and contains the kafka error code. + // Programs may use the standard errors.Is function to test the error + // against kafka error codes. + Error error +} + +// EndTxn sends an EndTxn request to a kafka broker and returns its response. +func (c *Client) EndTxn(ctx context.Context, req *EndTxnRequest) (*EndTxnResponse, error) { + m, err := c.roundTrip(ctx, req.Addr, &endtxn.Request{ + TransactionalID: req.TransactionalID, + ProducerID: int64(req.ProducerID), + ProducerEpoch: int16(req.ProducerEpoch), + Committed: req.Committed, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).EndTxn: %w", err) + } + + r := m.(*endtxn.Response) + + res := &EndTxnResponse{ + Throttle: makeDuration(r.ThrottleTimeMs), + Error: makeError(r.ErrorCode, ""), + } + + return res, nil +} diff --git a/initproducerid_test.go b/initproducerid_test.go index 66e23da84..7a2b6b0a5 100644 --- a/initproducerid_test.go +++ b/initproducerid_test.go @@ -41,7 +41,7 @@ func TestClientInitProducerId(t *testing.T) { resp, err := client.InitProducerID(context.Background(), &InitProducerIDRequest{ Addr: transactionCoordinator, TransactionalID: tid, - TransactionTimeoutMs: 3000, + TransactionTimeoutMs: 30000, }) if err != nil { t.Fatal(err) @@ -57,7 +57,9 @@ func TestClientInitProducerId(t *testing.T) { resp, err = client.InitProducerID(context.Background(), &InitProducerIDRequest{ Addr: transactionCoordinator, TransactionalID: tid, - TransactionTimeoutMs: 3000, + TransactionTimeoutMs: 30000, + ProducerID: pid1, + ProducerEpoch: epoch1, }) if err != nil { t.Fatal(err) diff --git a/protocol/addpartitionstotxn/addpartitionstotxn.go b/protocol/addpartitionstotxn/addpartitionstotxn.go new file mode 100644 index 000000000..d9dd120d5 --- /dev/null +++ b/protocol/addpartitionstotxn/addpartitionstotxn.go @@ -0,0 +1,58 @@ +package addpartitionstotxn + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v3,max=v3,tag"` + + TransactionalID string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"` + ProducerID int64 `kafka:"min=v0,max=v3"` + ProducerEpoch int16 `kafka:"min=v0,max=v3"` + Topics []RequestTopic `kafka:"min=v0,max=v3"` +} + +type RequestTopic struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v3,max=v3,tag"` + + Name string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"` + Partitions []int32 `kafka:"min=v0,max=v3"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.AddPartitionsToTxn } + +type Response struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v3,max=v3,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + Results []ResponseResult `kafka:"min=v0,max=v3"` +} + +type ResponseResult struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v3,max=v3,tag"` + + Name string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"` + Results []ResponsePartition `kafka:"min=v0,max=v3"` +} + +type ResponsePartition struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v3,max=v3,tag"` + + PartitionIndex int32 `kafka:"min=v0,max=v3"` + ErrorCode int16 `kafka:"min=v0,max=v3"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.AddPartitionsToTxn } diff --git a/protocol/addpartitionstotxn/addpartitionstotxn_test.go b/protocol/addpartitionstotxn/addpartitionstotxn_test.go new file mode 100644 index 000000000..91baaa13a --- /dev/null +++ b/protocol/addpartitionstotxn/addpartitionstotxn_test.go @@ -0,0 +1,60 @@ +package addpartitionstotxn_test + +import ( + "testing" + + "github.com/segmentio/kafka-go/protocol/addpartitionstotxn" + "github.com/segmentio/kafka-go/protocol/prototest" +) + +func TestAddPartitionsToTxnRequest(t *testing.T) { + for _, version := range []int16{0, 1, 2, 3} { + prototest.TestRequest(t, version, &addpartitionstotxn.Request{ + TransactionalID: "transaction-id-0", + ProducerID: 10, + ProducerEpoch: 100, + Topics: []addpartitionstotxn.RequestTopic{ + { + Name: "topic-1", + Partitions: []int32{0, 1, 2, 3}, + }, + { + Name: "topic-2", + Partitions: []int32{0, 1, 2}, + }, + }, + }) + } +} + +func TestAddPartitionsToTxnResponse(t *testing.T) { + for _, version := range []int16{0, 1, 2, 3} { + prototest.TestResponse(t, version, &addpartitionstotxn.Response{ + ThrottleTimeMs: 20, + Results: []addpartitionstotxn.ResponseResult{ + { + Name: "topic-1", + Results: []addpartitionstotxn.ResponsePartition{ + { + PartitionIndex: 0, + ErrorCode: 19, + }, + { + PartitionIndex: 1, + ErrorCode: 0, + }, + }, + }, + { + Name: "topic-2", + Results: []addpartitionstotxn.ResponsePartition{ + { + PartitionIndex: 0, + ErrorCode: 0, + }, + }, + }, + }, + }) + } +} diff --git a/protocol/endtxn/endtxn.go b/protocol/endtxn/endtxn.go new file mode 100644 index 000000000..fb00d9ae7 --- /dev/null +++ b/protocol/endtxn/endtxn.go @@ -0,0 +1,31 @@ +package endtxn + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v3,max=v3,tag"` + + TransactionalID string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"` + ProducerID int64 `kafka:"min=v0,max=v3"` + ProducerEpoch int16 `kafka:"min=v0,max=v3"` + Committed bool `kafka:"min=v0,max=v3"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.EndTxn } + +type Response struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v3,max=v3,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + ErrorCode int16 `kafka:"min=v0,max=v3"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.EndTxn } diff --git a/protocol/endtxn/endtxn_test.go b/protocol/endtxn/endtxn_test.go new file mode 100644 index 000000000..09f7c3e24 --- /dev/null +++ b/protocol/endtxn/endtxn_test.go @@ -0,0 +1,28 @@ +package endtxn_test + +import ( + "testing" + + "github.com/segmentio/kafka-go/protocol/endtxn" + "github.com/segmentio/kafka-go/protocol/prototest" +) + +func TestEndTxnRequest(t *testing.T) { + for _, version := range []int16{0, 1, 2, 3} { + prototest.TestRequest(t, version, &endtxn.Request{ + TransactionalID: "transactional-id-1", + ProducerID: 1, + ProducerEpoch: 100, + Committed: false, + }) + } +} + +func TestEndTxnResponse(t *testing.T) { + for _, version := range []int16{0, 1, 2, 3} { + prototest.TestResponse(t, version, &endtxn.Response{ + ThrottleTimeMs: 1000, + ErrorCode: 4, + }) + } +}