Skip to content

Commit

Permalink
Merge pull request #387 from DataDog/mborst/topicmappr-library
Browse files Browse the repository at this point in the history
Topicmappr refactor
  • Loading branch information
jamiealquiza authored Jan 19, 2022
2 parents d12a568 + 295fcd6 commit 59c046e
Show file tree
Hide file tree
Showing 15 changed files with 692 additions and 824 deletions.
35 changes: 7 additions & 28 deletions cmd/topicmappr/commands/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,20 @@ const (
var (
// Characters allowed in Kafka topic names
topicNormalChar = regexp.MustCompile(`[a-zA-Z0-9_\\-]`)

// Config holds global configs.
Config struct {
topics []*regexp.Regexp
topicsExclude []*regexp.Regexp
brokers []int
}
)

func bootstrap(cmd *cobra.Command) {
b, _ := cmd.Flags().GetString("brokers")
Config.brokers = brokerStringToSlice(b)

func sanitizeInput(cmd *cobra.Command) {
// Append trailing slash if not included.
op := cmd.Flag("out-path").Value.String()
if op != "" && !strings.HasSuffix(op, "/") {
cmd.Flags().Set("out-path", op+"/")
}

// Populate topic include / exclude regex.
if include, _ := cmd.Flags().GetString("topics"); include != "" {
Config.topics = TopicRegex(include)
}

if exclude, _ := cmd.Flags().GetString("topics-exclude"); exclude != "" {
Config.topicsExclude = TopicRegex(exclude)
}
}

// TopicRegex takes a string of csv values and returns a []*regexp.Regexp.
// topicRegex takes a string of csv values and returns a []*regexp.Regexp.
// The values are either a string literal and become ^value$ or are regex and
// compiled then added.
func TopicRegex(s string) []*regexp.Regexp {
func topicRegex(s string) []*regexp.Regexp {
var out []*regexp.Regexp

// Update string literals to ^value$ regex.
Expand Down Expand Up @@ -88,23 +69,21 @@ func TopicRegex(s string) []*regexp.Regexp {
// topic discovery` via ZooKeeper.
// - that the --placement flag was set to 'storage', which expects
// metrics metadata to be stored in ZooKeeper.
func initZooKeeper(cmd *cobra.Command) (kafkazk.Handler, error) {
func initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix string) (kafkazk.Handler, error) {
// Suppress underlying ZK client noise.
log.SetOutput(ioutil.Discard)

zkAddr := cmd.Parent().Flag("zk-addr").Value.String()
timeout := 250 * time.Millisecond

zk, err := kafkazk.NewHandler(&kafkazk.Config{
Connect: zkAddr,
Prefix: cmd.Parent().Flag("zk-prefix").Value.String(),
MetricsPrefix: cmd.Flag("zk-metrics-prefix").Value.String(),
Prefix: kafkaPrefix,
MetricsPrefix: metricsPrefix,
})

if err != nil {
return nil, fmt.Errorf("Error connecting to ZooKeeper: %s", err)
}

timeout := 250 * time.Millisecond
time.Sleep(timeout)

if !zk.Ready() {
Expand Down
10 changes: 5 additions & 5 deletions cmd/topicmappr/commands/evac_leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var pMapIn = kafkazk.PartitionMap{
func TestRemoveProblemBroker(t *testing.T) {
problemBrokerId := 10001

pMapOut := EvacLeadership(pMapIn, []int{problemBrokerId}, []string{topic})
pMapOut := evacuateLeadership(pMapIn, []int{problemBrokerId}, []string{topic})

for _, partition := range pMapOut.Partitions {
if partition.Replicas[0] == problemBrokerId {
Expand All @@ -53,7 +53,7 @@ func TestRemoveProblemBroker(t *testing.T) {
func TestEvacTwoProblemBrokers(t *testing.T) {
problemBrokers := []int{10001, 10002}

pMapOut := EvacLeadership(pMapIn, problemBrokers, []string{topic})
pMapOut := evacuateLeadership(pMapIn, problemBrokers, []string{topic})

for _, partition := range pMapOut.Partitions {
if partition.Replicas[0] == problemBrokers[0] || partition.Replicas[0] == problemBrokers[1] {
Expand All @@ -63,7 +63,7 @@ func TestEvacTwoProblemBrokers(t *testing.T) {
}

func TestNoMatchingTopicToEvac(t *testing.T) {
pMapOut := EvacLeadership(pMapIn, []int{10001}, []string{"some other topic"})
pMapOut := evacuateLeadership(pMapIn, []int{10001}, []string{"some other topic"})

for i, partition := range pMapOut.Partitions {
for j, broker := range partition.Replicas {
Expand All @@ -81,7 +81,7 @@ func TestNoMatchingTopicToEvac(t *testing.T) {
// problemBrokers[10002] = &kafkazk.Broker{}
// problemBrokers[10003] = &kafkazk.Broker{}
//
// EvacLeadership(pMapIn, problemBrokers)
// evacuateLeadership(pMapIn, problemBrokers)
//
// t.Errorf("EvacLeadership should have errored out at this point.")
// t.Errorf("evacuateLeadership should have errored out at this point.")
//}
62 changes: 16 additions & 46 deletions cmd/topicmappr/commands/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,93 +2,63 @@ package commands

import (
"fmt"
"os"
"regexp"
"time"

"github.com/DataDog/kafka-kit/v3/kafkazk"

"github.com/spf13/cobra"
)

// checkMetaAge checks the age of the stored partition and broker storage
// metrics data against the tolerated metrics age parameter.
func checkMetaAge(cmd *cobra.Command, zk kafkazk.Handler) {
func checkMetaAge(zk kafkazk.Handler, maxAge int) error {
age, err := zk.MaxMetaAge()
if err != nil {
fmt.Printf("Error fetching metrics metadata: %s\n", err)
os.Exit(1)
return fmt.Errorf("Error fetching metrics metadata: %s\n", err)
}

tol, _ := cmd.Flags().GetInt("metrics-age")

if age > time.Duration(tol)*time.Minute {
fmt.Printf("Metrics metadata is older than allowed: %s\n", age)
os.Exit(1)
if age > time.Duration(maxAge)*time.Minute {
return fmt.Errorf("Metrics metadata is older than allowed: %s\n", age)
}
return nil
}

// getBrokerMeta returns a map of brokers and broker metadata for those
// registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper
// (via an external mechanism*) can be merged into the metadata.
func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.BrokerMetaMap {
brokerMeta, errs := zk.GetAllBrokerMeta(m)
// If no data is returned, report and exit. Otherwise, it's possible that
// complete data for a few brokers wasn't returned. We check in subsequent
// steps as to whether any brokers that matter are missing metrics.
if errs != nil && brokerMeta == nil {
for _, e := range errs {
fmt.Println(e)
}
os.Exit(1)
}

return brokerMeta
func getBrokerMeta(zk kafkazk.Handler, m bool) (kafkazk.BrokerMetaMap, []error) {
return zk.GetAllBrokerMeta(m)
}

// ensureBrokerMetrics takes a map of reference brokers and a map of discovered
// broker metadata. Any non-missing brokers in the broker map must be present
// in the broker metadata map and have a non-true MetricsIncomplete value.
func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) {
var e bool
func ensureBrokerMetrics(bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) []error {
errs := []error{}
for id, b := range bm {
// Missing brokers won't be found in the brokerMeta.
if !b.Missing && id != kafkazk.StubBrokerID && bmm[id].MetricsIncomplete {
e = true
fmt.Printf("Metrics not found for broker %d\n", id)
errs = append(errs, fmt.Errorf("Metrics not found for broker %d\n", id))
}
}

if e {
os.Exit(1)
}
return errs
}

// getPartitionMeta returns a map of topic, partition metadata persisted in
// ZooKeeper (via an external mechanism*). This is primarily partition size
// metrics data used for the storage placement strategy.
func getPartitionMeta(cmd *cobra.Command, zk kafkazk.Handler) kafkazk.PartitionMetaMap {
partitionMeta, err := zk.GetAllPartitionMeta()
if err != nil {
fmt.Println(err)
os.Exit(1)
}

return partitionMeta
func getPartitionMeta(zk kafkazk.Handler) (kafkazk.PartitionMetaMap, error) {
return zk.GetAllPartitionMeta()
}

// stripPendingDeletes takes a partition map and zk handler. It looks up any
// topics in a pending delete state and removes them from the provided partition
// map, returning a list of topics removed.
func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) []string {
func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) ([]string, error) {
// Get pending deletions.
pd, err := zk.GetPendingDeletion()
if err != nil {
fmt.Println("Error fetching topics pending deletion")
}

if len(pd) == 0 {
return []string{}
return []string{}, err
}

// Convert to a series of literal regex.
Expand All @@ -99,7 +69,7 @@ func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) []string
}

// Update the PartitionMap and return a list of removed topic names.
return removeTopics(pm, re)
return removeTopics(pm, re), err
}

// removeTopics takes a PartitionMap and []*regexp.Regexp of topic name patters.
Expand Down
Loading

0 comments on commit 59c046e

Please sign in to comment.