Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topicmappr refactor #387

Merged
merged 18 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions cmd/topicmappr/commands/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ func bootstrap(cmd *cobra.Command) {

// Populate topic include / exclude regex.
if include, _ := cmd.Flags().GetString("topics"); include != "" {
Config.topics = TopicRegex(include)
Config.topics = topicRegex(include)
}

if exclude, _ := cmd.Flags().GetString("topics-exclude"); exclude != "" {
Config.topicsExclude = TopicRegex(exclude)
Config.topicsExclude = topicRegex(exclude)
}
}

// TopicRegex takes a string of csv values and returns a []*regexp.Regexp.
// topicRegex takes a string of csv values and returns a []*regexp.Regexp.
// The values are either a string literal and become ^value$ or are regex and
// compiled then added.
func TopicRegex(s string) []*regexp.Regexp {
func topicRegex(s string) []*regexp.Regexp {
var out []*regexp.Regexp

// Update string literals to ^value$ regex.
Expand Down Expand Up @@ -88,23 +88,21 @@ func TopicRegex(s string) []*regexp.Regexp {
// topic discovery` via ZooKeeper.
// - that the --placement flag was set to 'storage', which expects
// metrics metadata to be stored in ZooKeeper.
func initZooKeeper(cmd *cobra.Command) (kafkazk.Handler, error) {
func initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix string) (kafkazk.Handler, error) {
// Suppress underlying ZK client noise.
log.SetOutput(ioutil.Discard)

zkAddr := cmd.Parent().Flag("zk-addr").Value.String()
timeout := 250 * time.Millisecond

zk, err := kafkazk.NewHandler(&kafkazk.Config{
Connect: zkAddr,
Prefix: cmd.Parent().Flag("zk-prefix").Value.String(),
MetricsPrefix: cmd.Flag("zk-metrics-prefix").Value.String(),
Prefix: kafkaPrefix,
MetricsPrefix: metricsPrefix,
})

if err != nil {
return nil, fmt.Errorf("Error connecting to ZooKeeper: %s", err)
}

timeout := 250 * time.Millisecond
time.Sleep(timeout)

if !zk.Ready() {
Expand Down
10 changes: 5 additions & 5 deletions cmd/topicmappr/commands/evac_leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var pMapIn = kafkazk.PartitionMap{
func TestRemoveProblemBroker(t *testing.T) {
problemBrokerId := 10001

pMapOut := EvacLeadership(pMapIn, []int{problemBrokerId}, []string{topic})
pMapOut := evacuateLeadership(pMapIn, []int{problemBrokerId}, []string{topic})

for _, partition := range pMapOut.Partitions {
if partition.Replicas[0] == problemBrokerId {
Expand All @@ -53,7 +53,7 @@ func TestRemoveProblemBroker(t *testing.T) {
func TestEvacTwoProblemBrokers(t *testing.T) {
problemBrokers := []int{10001, 10002}

pMapOut := EvacLeadership(pMapIn, problemBrokers, []string{topic})
pMapOut := evacuateLeadership(pMapIn, problemBrokers, []string{topic})

for _, partition := range pMapOut.Partitions {
if partition.Replicas[0] == problemBrokers[0] || partition.Replicas[0] == problemBrokers[1] {
Expand All @@ -63,7 +63,7 @@ func TestEvacTwoProblemBrokers(t *testing.T) {
}

func TestNoMatchingTopicToEvac(t *testing.T) {
pMapOut := EvacLeadership(pMapIn, []int{10001}, []string{"some other topic"})
pMapOut := evacuateLeadership(pMapIn, []int{10001}, []string{"some other topic"})

for i, partition := range pMapOut.Partitions {
for j, broker := range partition.Replicas {
Expand All @@ -81,7 +81,7 @@ func TestNoMatchingTopicToEvac(t *testing.T) {
// problemBrokers[10002] = &kafkazk.Broker{}
// problemBrokers[10003] = &kafkazk.Broker{}
//
// EvacLeadership(pMapIn, problemBrokers)
// evacuateLeadership(pMapIn, problemBrokers)
//
// t.Errorf("EvacLeadership should have errored out at this point.")
// t.Errorf("evacuateLeadership should have errored out at this point.")
//}
14 changes: 5 additions & 9 deletions cmd/topicmappr/commands/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,18 @@ import (
"time"

"github.com/DataDog/kafka-kit/v3/kafkazk"

"github.com/spf13/cobra"
)

// checkMetaAge checks the age of the stored partition and broker storage
// metrics data against the tolerated metrics age parameter.
func checkMetaAge(cmd *cobra.Command, zk kafkazk.Handler) {
func checkMetaAge(zk kafkazk.Handler, maxAge int) {
age, err := zk.MaxMetaAge()
if err != nil {
fmt.Printf("Error fetching metrics metadata: %s\n", err)
os.Exit(1)
}

tol, _ := cmd.Flags().GetInt("metrics-age")

if age > time.Duration(tol)*time.Minute {
if age > time.Duration(maxAge)*time.Minute {
fmt.Printf("Metrics metadata is older than allowed: %s\n", age)
os.Exit(1)
}
Expand All @@ -31,7 +27,7 @@ func checkMetaAge(cmd *cobra.Command, zk kafkazk.Handler) {
// getBrokerMeta returns a map of brokers and broker metadata for those
// registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper
// (via an external mechanism*) can be merged into the metadata.
func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.BrokerMetaMap {
func getBrokerMeta(zk kafkazk.Handler, m bool) kafkazk.BrokerMetaMap {
brokerMeta, errs := zk.GetAllBrokerMeta(m)
// If no data is returned, report and exit. Otherwise, it's possible that
// complete data for a few brokers wasn't returned. We check in subsequent
Expand All @@ -49,7 +45,7 @@ func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.Broke
// ensureBrokerMetrics takes a map of reference brokers and a map of discovered
// broker metadata. Any non-missing brokers in the broker map must be present
// in the broker metadata map and have a non-true MetricsIncomplete value.
func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) {
func ensureBrokerMetrics(bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) {
var e bool
for id, b := range bm {
// Missing brokers won't be found in the brokerMeta.
Expand All @@ -67,7 +63,7 @@ func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.B
// getPartitionMeta returns a map of topic, partition metadata persisted in
// ZooKeeper (via an external mechanism*). This is primarily partition size
// metrics data used for the storage placement strategy.
func getPartitionMeta(cmd *cobra.Command, zk kafkazk.Handler) kafkazk.PartitionMetaMap {
func getPartitionMeta(zk kafkazk.Handler) kafkazk.PartitionMetaMap {
partitionMeta, err := zk.GetAllPartitionMeta()
if err != nil {
fmt.Println(err)
Expand Down
26 changes: 1 addition & 25 deletions cmd/topicmappr/commands/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,31 +124,7 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM
// input and wasn't marked for replacement (generally, users are doing storage placements
// particularly to balance out the storage of the input broker list).

// Filter function for brokers where the Replaced field is false.
nonReplaced := func(b *kafkazk.Broker) bool {
if b.Replace {
return false
}
return true
}

// Get all IDs in PartitionMap.
mappedIDs := map[int]struct{}{}
for _, partn := range pm1.Partitions {
for _, id := range partn.Replicas {
mappedIDs[id] = struct{}{}
}
}

// Filter function for brokers that are in the partition map.
mapped := func(b *kafkazk.Broker) bool {
if _, exist := mappedIDs[b.ID]; exist {
return true
}
return false
}

mb1, mb2 := bm1.Filter(mapped), bm2.Filter(nonReplaced)
mb1, mb2 := bm1.Filter(pm1.BrokersIn()), bm2.Filter(kafkazk.NotReplacedBrokersFn)

// Range before/after.
r1, r2 := mb1.StorageRange(), mb2.StorageRange()
Expand Down
56 changes: 0 additions & 56 deletions cmd/topicmappr/commands/reassignments.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package commands

import (
"fmt"
"os"
"sync"

"github.com/DataDog/kafka-kit/v3/kafkazk"
Expand Down Expand Up @@ -131,57 +129,3 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re

return results
}

// EvacLeadership For the given set of topics, makes sure that the given brokers are not
// leaders of any partitions. If we have any partitions that only have replicas from the
// evac broker list, we will fail.
func EvacLeadership(partitionMapIn kafkazk.PartitionMap, evacBrokers []int, evacTopics []string) *kafkazk.PartitionMap {
// evacuation algo starts here
partitionMapOut := partitionMapIn.Copy()

// make a lookup map of topics
topicsMap := map[string]struct{}{}
for _, topic := range evacTopics {
topicsMap[topic] = struct{}{}
}

// make a lookup map of topics
brokersMap := map[int]struct{}{}
for _, b := range evacBrokers {
brokersMap[b] = struct{}{}
}

// TODO What if we only want to evacuate a subset of partitions?
// For now, problem brokers is the bigger use case.

// swap leadership for all broker/partition/topic combos
for i, p := range partitionMapIn.Partitions {
// check the topic is one of the target topics
if _, correctTopic := topicsMap[p.Topic]; !correctTopic {
continue
}

// check the leader to see if its one of the evac brokers
if _, contains := brokersMap[p.Replicas[0]]; !contains {
continue
}

for j, replica := range p.Replicas {
// If one of the replicas is not being leadership evacuated, use that one and swap.
if _, contains := brokersMap[replica]; !contains {
newLeader := p.Replicas[j]
partitionMapOut.Partitions[i].Replicas[j] = p.Replicas[0]
partitionMapOut.Partitions[i].Replicas[0] = newLeader
break
}

// If we've tried every replica, but they are all being leader evac'd.
if replica == p.Replicas[len(p.Replicas)-1] {
fmt.Println("No replicas available to evacuate leadership to. All replicas present in EvacLeadership broker list.")
os.Exit(1)
}
}
}

return partitionMapOut
}
16 changes: 10 additions & 6 deletions cmd/topicmappr/commands/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func init() {
rebalanceCmd.Flags().Int("partition-size-threshold", 512, "Size in megabytes where partitions below this value will not be moved in a rebalance")
rebalanceCmd.Flags().Bool("locality-scoped", false, "Ensure that all partition movements are scoped by rack.id")
rebalanceCmd.Flags().Bool("verbose", false, "Verbose output")
rebalanceCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics")
rebalanceCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes)")
rebalanceCmd.Flags().Bool("optimize-leadership", false, "Rebalance all broker leader/follower ratios")

Expand All @@ -46,18 +45,23 @@ func rebalance(cmd *cobra.Command, _ []string) {
bootstrap(cmd)

// ZooKeeper init.
zk, err := initZooKeeper(cmd)
zkAddr := cmd.Parent().Flag("zk-addr").Value.String()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you're experiencing the joy of extracting cobra flag vals

kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String()
metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String()
zk, err := initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

defer zk.Close()

maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age")

// Get broker and partition metadata.
checkMetaAge(cmd, zk)
brokerMeta := getBrokerMeta(cmd, zk, true)
partitionMeta := getPartitionMeta(cmd, zk)
checkMetaAge(zk, maxMetadataAge)
brokerMeta := getBrokerMeta(zk, true)
partitionMeta := getPartitionMeta(zk)

// Get the current partition map.
partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk)
Expand Down Expand Up @@ -175,7 +179,7 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap,
}

// Check if any referenced brokers are marked as having missing/partial metrics data.
ensureBrokerMetrics(cmd, brokers, bm)
ensureBrokerMetrics(brokers, bm)

switch {
case c.Missing > 0, c.OldMissing > 0, c.Replace > 0:
Expand Down
19 changes: 11 additions & 8 deletions cmd/topicmappr/commands/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func init() {
rebuildCmd.Flags().String("optimize", "distribution", "Optimization priority for the storage placement strategy: [distribution, storage]")
rebuildCmd.Flags().Float64("partition-size-factor", 1.0, "Factor by which to multiply partition sizes when using storage placement")
rebuildCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to ('-1' for all currently mapped brokers, '-2' for all brokers in cluster)")
rebuildCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics (when using storage placement)")
rebuildCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes) (when using storage placement)")
rebuildCmd.Flags().Bool("skip-no-ops", false, "Skip no-op partition assigments")
rebuildCmd.Flags().Bool("optimize-leadership", false, "Rebalance all broker leader/follower ratios")
Expand Down Expand Up @@ -91,7 +90,10 @@ func rebuild(cmd *cobra.Command, _ []string) {
var zk kafkazk.Handler
if m || len(Config.topics) > 0 || p == "storage" {
var err error
zk, err = initZooKeeper(cmd)
zkAddr := cmd.Parent().Flag("zk-addr").Value.String()
kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String()
metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String()
zk, err = initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -103,7 +105,7 @@ func rebuild(cmd *cobra.Command, _ []string) {
var evacTopics []string
var err error
if let != "" {
evacTopics, err = zk.GetTopics(TopicRegex(let))
evacTopics, err = zk.GetTopics(topicRegex(let))
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down Expand Up @@ -132,19 +134,20 @@ func rebuild(cmd *cobra.Command, _ []string) {
// Fetch broker metadata.
var withMetrics bool
if cmd.Flag("placement").Value.String() == "storage" {
checkMetaAge(cmd, zk)
maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age")
checkMetaAge(zk, maxMetadataAge)
withMetrics = true
}

var brokerMeta kafkazk.BrokerMetaMap
if m, _ := cmd.Flags().GetBool("use-meta"); m {
brokerMeta = getBrokerMeta(cmd, zk, withMetrics)
brokerMeta = getBrokerMeta(zk, withMetrics)
}

// Fetch partition metadata.
var partitionMeta kafkazk.PartitionMetaMap
if cmd.Flag("placement").Value.String() == "storage" {
partitionMeta = getPartitionMeta(cmd, zk)
partitionMeta = getPartitionMeta(zk)
}

// Build a partition map either from literal map text input or by fetching the
Expand All @@ -169,7 +172,7 @@ func rebuild(cmd *cobra.Command, _ []string) {
// Check if any referenced brokers are marked as having
// missing/partial metrics data.
if m, _ := cmd.Flags().GetBool("use-meta"); m {
ensureBrokerMetrics(cmd, brokers, brokerMeta)
ensureBrokerMetrics(brokers, brokerMeta)
}

// Create substitution affinities.
Expand Down Expand Up @@ -212,7 +215,7 @@ func rebuild(cmd *cobra.Command, _ []string) {
phasedMap = phasedReassignment(originalMap, partitionMapOut)
}

partitionMapOut = EvacLeadership(*partitionMapOut, evacBrokers, evacTopics)
partitionMapOut = evacuateLeadership(*partitionMapOut, evacBrokers, evacTopics)

// Print map change results.
printMapChanges(originalMap, partitionMapOut)
Expand Down
Loading