-
Notifications
You must be signed in to change notification settings - Fork 799
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implementation of ListPartitionReassignments API (#1203)
* implemented ListPartitionReassignments API. * fix nullable * fix lint * fix tag.
- Loading branch information
Boris Granveaud
authored
Oct 16, 2023
1 parent
5b97cf9
commit f2d9e08
Showing
4 changed files
with
296 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments" | ||
) | ||
|
||
// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API. | ||
type ListPartitionReassignmentsRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// Topics we want reassignments for, mapped by their name, or nil to list everything. | ||
Topics map[string]ListPartitionReassignmentsRequestTopic | ||
|
||
// Timeout is the amount of time to wait for the request to complete. | ||
Timeout time.Duration | ||
} | ||
|
||
// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single | ||
// topic. | ||
type ListPartitionReassignmentsRequestTopic struct { | ||
// The partitions to list partition reassignments for. | ||
PartitionIndexes []int | ||
} | ||
|
||
// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API. | ||
type ListPartitionReassignmentsResponse struct { | ||
// Error is set to a non-nil value including the code and message if a top-level | ||
// error was encountered. | ||
Error error | ||
|
||
// Topics contains results for each topic, mapped by their name. | ||
Topics map[string]ListPartitionReassignmentsResponseTopic | ||
} | ||
|
||
// ListPartitionReassignmentsResponseTopic contains the detailed result of | ||
// ongoing reassignments for a topic. | ||
type ListPartitionReassignmentsResponseTopic struct { | ||
// Partitions contains result for topic partitions. | ||
Partitions []ListPartitionReassignmentsResponsePartition | ||
} | ||
|
||
// ListPartitionReassignmentsResponsePartition contains the detailed result of | ||
// ongoing reassignments for a single partition. | ||
type ListPartitionReassignmentsResponsePartition struct { | ||
// PartitionIndex contains index of the partition. | ||
PartitionIndex int | ||
|
||
// Replicas contains the current replica set. | ||
Replicas []int | ||
|
||
// AddingReplicas contains the set of replicas we are currently adding. | ||
AddingReplicas []int | ||
|
||
// RemovingReplicas contains the set of replicas we are currently removing. | ||
RemovingReplicas []int | ||
} | ||
|
||
func (c *Client) ListPartitionReassignments( | ||
ctx context.Context, | ||
req *ListPartitionReassignmentsRequest, | ||
) (*ListPartitionReassignmentsResponse, error) { | ||
apiReq := &listpartitionreassignments.Request{ | ||
TimeoutMs: int32(req.Timeout.Milliseconds()), | ||
} | ||
|
||
for topicName, topicReq := range req.Topics { | ||
apiReq.Topics = append( | ||
apiReq.Topics, | ||
listpartitionreassignments.RequestTopic{ | ||
Name: topicName, | ||
PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes), | ||
}, | ||
) | ||
} | ||
|
||
protoResp, err := c.roundTrip( | ||
ctx, | ||
req.Addr, | ||
apiReq, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
apiResp := protoResp.(*listpartitionreassignments.Response) | ||
|
||
resp := &ListPartitionReassignmentsResponse{ | ||
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage), | ||
Topics: make(map[string]ListPartitionReassignmentsResponseTopic), | ||
} | ||
|
||
for _, topicResult := range apiResp.Topics { | ||
respTopic := ListPartitionReassignmentsResponseTopic{} | ||
for _, partitionResult := range topicResult.Partitions { | ||
respTopic.Partitions = append( | ||
respTopic.Partitions, | ||
ListPartitionReassignmentsResponsePartition{ | ||
PartitionIndex: int(partitionResult.PartitionIndex), | ||
Replicas: int32ToIntArray(partitionResult.Replicas), | ||
AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas), | ||
RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas), | ||
}, | ||
) | ||
} | ||
resp.Topics[topicResult.Name] = respTopic | ||
} | ||
|
||
return resp, nil | ||
} | ||
|
||
func intToInt32Array(arr []int) []int32 { | ||
if arr == nil { | ||
return nil | ||
} | ||
res := make([]int32, len(arr)) | ||
for i := range arr { | ||
res[i] = int32(arr[i]) | ||
} | ||
return res | ||
} | ||
|
||
func int32ToIntArray(arr []int32) []int { | ||
if arr == nil { | ||
return nil | ||
} | ||
res := make([]int, len(arr)) | ||
for i := range arr { | ||
res[i] = int(arr[i]) | ||
} | ||
return res | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
) | ||
|
||
func TestClientListPartitionReassignments(t *testing.T) { | ||
if !ktesting.KafkaIsAtLeast("2.4.0") { | ||
return | ||
} | ||
|
||
ctx := context.Background() | ||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
topic := makeTopic() | ||
createTopic(t, topic, 2) | ||
defer deleteTopic(t, topic) | ||
|
||
// Can't really get an ongoing partition reassignment with local Kafka, so just do a superficial test here. | ||
resp, err := client.ListPartitionReassignments( | ||
ctx, | ||
&ListPartitionReassignmentsRequest{ | ||
Topics: map[string]ListPartitionReassignmentsRequestTopic{ | ||
topic: {PartitionIndexes: []int{0, 1}}, | ||
}, | ||
}, | ||
) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if resp.Error != nil { | ||
t.Error( | ||
"Unexpected error in response", | ||
"expected", nil, | ||
"got", resp.Error, | ||
) | ||
} | ||
if len(resp.Topics) != 0 { | ||
t.Error( | ||
"Unexpected length of topic results", | ||
"expected", 0, | ||
"got", len(resp.Topics), | ||
) | ||
} | ||
} |
70 changes: 70 additions & 0 deletions
70
protocol/listpartitionreassignments/listpartitionreassignments.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package listpartitionreassignments | ||
|
||
import "github.com/segmentio/kafka-go/protocol" | ||
|
||
func init() { | ||
protocol.Register(&Request{}, &Response{}) | ||
} | ||
|
||
// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListPartitionReassignments. | ||
|
||
type Request struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
TimeoutMs int32 `kafka:"min=v0,max=v0"` | ||
Topics []RequestTopic `kafka:"min=v0,max=v0,nullable"` | ||
} | ||
|
||
type RequestTopic struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
Name string `kafka:"min=v0,max=v0"` | ||
PartitionIndexes []int32 `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
func (r *Request) ApiKey() protocol.ApiKey { | ||
return protocol.ListPartitionReassignments | ||
} | ||
|
||
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { | ||
return cluster.Brokers[cluster.Controller], nil | ||
} | ||
|
||
type Response struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` | ||
ErrorCode int16 `kafka:"min=v0,max=v0"` | ||
ErrorMessage string `kafka:"min=v0,max=v0,nullable"` | ||
Topics []ResponseTopic `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
type ResponseTopic struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
Name string `kafka:"min=v0,max=v0"` | ||
Partitions []ResponsePartition `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
type ResponsePartition struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
PartitionIndex int32 `kafka:"min=v0,max=v0"` | ||
Replicas []int32 `kafka:"min=v0,max=v0"` | ||
AddingReplicas []int32 `kafka:"min=v0,max=v0"` | ||
RemovingReplicas []int32 `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
func (r *Response) ApiKey() protocol.ApiKey { | ||
return protocol.ListPartitionReassignments | ||
} |
41 changes: 41 additions & 0 deletions
41
protocol/listpartitionreassignments/listpartitionreassignments_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package listpartitionreassignments_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments" | ||
"github.com/segmentio/kafka-go/protocol/prototest" | ||
) | ||
|
||
const ( | ||
v0 = 0 | ||
) | ||
|
||
func TestListPartitionReassignmentsRequest(t *testing.T) { | ||
prototest.TestRequest(t, v0, &listpartitionreassignments.Request{ | ||
Topics: []listpartitionreassignments.RequestTopic{ | ||
{ | ||
Name: "topic-1", | ||
PartitionIndexes: []int32{1, 2, 3}, | ||
}, | ||
}, | ||
}) | ||
} | ||
|
||
func TestListPartitionReassignmentsResponse(t *testing.T) { | ||
prototest.TestResponse(t, v0, &listpartitionreassignments.Response{ | ||
Topics: []listpartitionreassignments.ResponseTopic{ | ||
{ | ||
Name: "topic-1", | ||
Partitions: []listpartitionreassignments.ResponsePartition{ | ||
{ | ||
PartitionIndex: 1, | ||
Replicas: []int32{1, 2, 3}, | ||
AddingReplicas: []int32{4, 5, 6}, | ||
RemovingReplicas: []int32{7, 8, 9}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}) | ||
} |