Skip to content

Commit

Permalink
Merge pull request #397 from DataDog/jamie/kafkaadmin-topicstate
Browse files Browse the repository at this point in the history
Jamie/kafkaadmin topicstate
  • Loading branch information
jamiealquiza authored Apr 4, 2022
2 parents 253ecbd + da30ffe commit 4b45e32
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 6 deletions.
4 changes: 4 additions & 0 deletions kafkaadmin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafkaadmin
import (
"fmt"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)
Expand All @@ -13,6 +14,9 @@ var (
SecurityProtocolSet = map[string]struct{}{"PLAINTEXT": empty, "SSL": empty, "SASL_PLAINTEXT": empty, "SASL_SSL": empty}
// SASLMechanismSet is the set of mechanisms supported for client to broker authentication
SASLMechanismSet = map[string]struct{}{"PLAIN": empty, "SCRAM-SHA-256": empty, "SCRAM-SHA-512": empty}
// Default timeout for requests to Kafka if a context is passed in with no
// deadline set.
defaultTimeout = 5 * time.Second
)

type FactoryFunc func(conf *kafka.ConfigMap) (*kafka.AdminClient, error)
Expand Down
20 changes: 14 additions & 6 deletions kafkaadmin/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,28 @@ import (
"fmt"
)

var (
// ErrNoData is a generic error for no data available to be returned.
ErrNoData = fmt.Errorf("no data returned")
)

// ErrSetThrottle is a generic error for SetThrottle.
type ErrSetThrottle struct {
Message string
}
type ErrSetThrottle struct{ Message string }

func (e ErrSetThrottle) Error() string {
return fmt.Sprintf("failed to set throttles: %s", e.Message)
}

// ErrRemoveThrottle is a generic error for RemoveThrottle.
type ErrRemoveThrottle struct {
Message string
}
type ErrRemoveThrottle struct{ Message string }

func (e ErrRemoveThrottle) Error() string {
return fmt.Sprintf("failed to remove throttles: %s", e.Message)
}

// ErrorFetchingMetadata is an error encountered fetching Kafka cluster metadata.
type ErrorFetchingMetadata struct{ Message string }

func (e ErrorFetchingMetadata) Error() string {
return fmt.Sprintf("error fetching metadata: %s", e.Message)
}
1 change: 1 addition & 0 deletions kafkaadmin/kafkaadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type KafkaAdmin interface {
// Topics.
CreateTopic(context.Context, CreateTopicConfig) error
DeleteTopic(context.Context, string) error
DescribeTopics(context.Context, []string) (TopicStates, error)
// Cluster.
SetThrottle(context.Context, SetThrottleConfig) error
RemoveThrottle(context.Context, RemoveThrottleConfig) error
Expand Down
56 changes: 56 additions & 0 deletions kafkaadmin/parsing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kafkaadmin

import (
"fmt"
"regexp"
)

/*
Enqueue some borrowed helpers from topicmappr that aren't worth exporting (plus,
they're modified a bit for this library).
*/

var (
// Accepted characters in Kafka topic names.
topicNormalChar = regexp.MustCompile(`[a-zA-Z0-9_\\-]`)
)

// stringsToRegex takes a []string of topic names and returns a []*regexp.Regexp.
// The values are either a string literal and become ^value$ or are regex and
// compiled then added.
func stringsToRegex(names []string) ([]*regexp.Regexp, error) {
var out []*regexp.Regexp

// Update string literals to ^value$ regex.
for n, t := range names {
if !containsRegex(t) {
names[n] = fmt.Sprintf(`^%s$`, t)
}
}

// Compile regex patterns.
for _, t := range names {
r, err := regexp.Compile(t)
if err != nil {
return nil, fmt.Errorf("invalid regex pattern: %s\n", t)
}

out = append(out, r)
}

return out, nil
}

// containsRegex takes a topic name string and returns whether or not
// it should be interpreted as regex.
func containsRegex(t string) bool {
// Check each character of the topic name. If it doesn't contain a legal Kafka
// topic name character, we're going to assume it's regex.
for _, c := range t {
if !topicNormalChar.MatchString(string(c)) {
return true
}
}

return false
}
123 changes: 123 additions & 0 deletions kafkaadmin/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kafkaadmin

import (
"context"
"regexp"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)
Expand All @@ -15,8 +17,43 @@ type CreateTopicConfig struct {
ReplicaAssignment ReplicaAssignment
}

// ReplicaAssignment is a [][]int32 of partition assignments. The outer slice
// index maps to the partition ID (ie index position 3 describes partition 3
// for the reference topic), the inner slice is an []int32 of broker assignments.
type ReplicaAssignment [][]int32

// TopicStates is a map of topic names to TopicState.
type TopicStates map[string]TopicState

// TopicState describes the current state of a topic.
type TopicState struct {
Name string
Partitions int32
ReplicationFactor int32
PartitionStates map[int]PartitionState
}

// PartitionState describes the state of a partition.
type PartitionState struct {
ID int32
Leader int32
Replicas []int32
ISR []int32
}

// NewTopicStates initializes a TopicStates.
func NewTopicStates() TopicStates {
return make(map[string]TopicState)
}

// NewTopicState initializes a TopicState.
func NewTopicState(name string) TopicState {
return TopicState{
Name: name,
PartitionStates: make(map[int]PartitionState),
}
}

// CreateTopic creates a topic.
func (c Client) CreateTopic(ctx context.Context, cfg CreateTopicConfig) error {
spec := kafka.TopicSpecification{
Expand Down Expand Up @@ -45,3 +82,89 @@ func (c Client) DeleteTopic(ctx context.Context, name string) error {
_, err := c.c.DeleteTopics(ctx, []string{name})
return err
}

// DescribeTopics takes a []string of topic names. Topic names can be name literals
// or optional regex. A TopicStates is returned for all matching topics.
func (c Client) DescribeTopics(ctx context.Context, topics []string) (TopicStates, error) {
// Use the context deadline remaining budget if set, otherwise use the default
// timeout value.
var timeout time.Duration
if dl, set := ctx.Deadline(); set {
timeout = dl.Sub(time.Now())
} else {
timeout = defaultTimeout
}

// Request the cluster metadata.
md, err := c.c.GetMetadata(nil, true, int(timeout.Milliseconds()))
if err != nil {
return nil, ErrorFetchingMetadata{Message: err.Error()}
}

// Strip topics that don't match any of the specified names.
topicNamesRegex, err := stringsToRegex(topics)
if err != nil {
return nil, err
}

filterMatches(md, topicNamesRegex)

return topicStatesFromMetadata(md)
}

func filterMatches(md *kafka.Metadata, re []*regexp.Regexp) {
for topic := range md.Topics {
var keep bool
for _, r := range re {
if r.MatchString(topic) {
keep = true
}
}
if !keep {
delete(md.Topics, topic)
}
}
}

func topicStatesFromMetadata(md *kafka.Metadata) (TopicStates, error) {
if len(md.Topics) == 0 {
return nil, ErrNoData
}

// Extract the topic metadata and populate it into the TopicStates.
var topicStates = NewTopicStates()

// For each topic in the global metadata, translate its metadata to a TopicState.
for topic, topicMeta := range md.Topics {
topicState := NewTopicState(topic)

var maxSeenReplicaLen int

// Scan the partitions and map the states.
for _, partn := range topicMeta.Partitions {
// Track the max seen replicas len to infer the replication factor. This may
// be worth referring to the Kafka code since it is possible to explicitly
// configure partitions to have variable replica set sizes; perhaps an
// average makes more sense?
if maxSeenReplicaLen < len(partn.Replicas) {
maxSeenReplicaLen = len(partn.Replicas)
}

topicState.PartitionStates[int(partn.ID)] = PartitionState{
ID: partn.ID,
Leader: partn.Leader,
Replicas: partn.Replicas,
ISR: partn.Isrs,
}
}

// Set the general topic attributes.
topicState.ReplicationFactor = int32(maxSeenReplicaLen)
topicState.Partitions = int32(len(topicMeta.Partitions))

// Add the TopicState to the global TopicStates.
topicStates[topic] = topicState
}

return topicStates, nil
}
131 changes: 131 additions & 0 deletions kafkaadmin/topics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package kafkaadmin

import (
"testing"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
)

func TestTopicStatesFromMetadata(t *testing.T) {
// Mock metadata.
md := fakeKafkaMetadata()
// Get a TopicStates.
ts, err := topicStatesFromMetadata(md)
assert.Nil(t, err)

// Expected results.
expected := NewTopicStates()

// test1 topic.
test1state := fakeTopicState("test1", 2)
test1state.setPartitionState(0, []int32{1001, 1002}, []int32{1001, 1002})
test1state.setPartitionState(1, []int32{1002}, []int32{1002})
expected["test1"] = test1state

// test2 topic.
test2state := fakeTopicState("test2", 2)
test2state.setPartitionState(0, []int32{1003, 1002}, []int32{1003, 1002})
test2state.setPartitionState(1, []int32{1002, 1003}, []int32{1003, 1002})
expected["test2"] = test2state

assert.Equal(t, expected, ts)
}

// fakeTopicState takes a topic name and desired number of partitions and returns
// a TopicState. Note that the PartitionStates are left empty; those are to be
// filled as needed in each test.
func fakeTopicState(name string, partitions int32) TopicState {
ts := NewTopicState(name)
ts.Partitions = partitions
ts.ReplicationFactor = 2
ts.PartitionStates = map[int]PartitionState{}
for i := int32(0); i < partitions; i++ {
ts.PartitionStates[int(i)] = PartitionState{
ID: i,
}
}

return ts
}

// setPartitionState takes a partition ID, the desired assigned replica and ISR
// and sets the partition state accordingly
func (t TopicState) setPartitionState(id int32, replicas []int32, isr []int32) {
ps := t.PartitionStates[int(id)]
ps.Leader = isr[0]
ps.Replicas = replicas
ps.ISR = isr
t.PartitionStates[int(id)] = ps
}

func fakeKafkaMetadata() *kafka.Metadata {
var noErr = kafka.NewError(kafka.ErrNoError, "Success", false)

return &kafka.Metadata{
Brokers: []kafka.BrokerMetadata{
{
ID: 1001,
Host: "host-a",
Port: 9092,
},
{
ID: 1002,
Host: "host-b",
Port: 9092,
},
{
ID: 1003,
Host: "host-c",
Port: 9092,
},
},
Topics: map[string]kafka.TopicMetadata{
"test1": {
Topic: "test1",
Partitions: []kafka.PartitionMetadata{
{
ID: 0,
Error: noErr,
Leader: 1001,
Replicas: []int32{1001, 1002},
Isrs: []int32{1001, 1002},
},
{
ID: 1,
Error: noErr,
Leader: 1002,
Replicas: []int32{1002},
Isrs: []int32{1002},
},
},
Error: noErr,
},
"test2": {
Topic: "test2",
Partitions: []kafka.PartitionMetadata{
{
ID: 0,
Error: noErr,
Leader: 1003,
Replicas: []int32{1003, 1002},
Isrs: []int32{1003, 1002},
},
{
ID: 1,
Error: noErr,
Leader: 1003,
Replicas: []int32{1002, 1003},
Isrs: []int32{1003, 1002},
},
},
Error: noErr,
},
},
OriginatingBroker: kafka.BrokerMetadata{
ID: 1001,
Host: "host-a",
Port: 9092,
},
}
}

0 comments on commit 4b45e32

Please sign in to comment.