From 716dbc524d254fbd9262e2debe48f62ea2556988 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Wed, 1 Dec 2021 14:33:15 +0100 Subject: [PATCH 01/18] [kafkazk] use Filter for Above/BelowMean --- kafkazk/stats.go | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/kafkazk/stats.go b/kafkazk/stats.go index e89ea1f..59ce199 100644 --- a/kafkazk/stats.go +++ b/kafkazk/stats.go @@ -216,24 +216,26 @@ func (b BrokerMap) Mean() float64 { return t / c } +// AboveMeanFn returns a BrokerFilterFn that filters brokers that are above the +// mean by d percent (0.00 < d). The mean type is provided as a function f. +func AboveMeanFn(d float64, f func() float64) BrokerFilterFn { + m := f() + return func(b *Broker) bool { return (b.StorageFree-m)/m > d } +} + // AboveMean returns a sorted []int of broker IDs that are above the mean // by d percent (0.00 < d). The mean type is provided as a function f. func (b BrokerMap) AboveMean(d float64, f func() float64) []int { - m := f() var ids []int if d <= 0.00 { return ids } - for _, br := range b { - if br.ID == StubBrokerID { - continue - } + filtered := b.Filter(AboveMeanFn(d, f)) - if (br.StorageFree-m)/m > d { - ids = append(ids, br.ID) - } + for _, br := range filtered { + ids = append(ids, br.ID) } sort.Ints(ids) @@ -241,24 +243,26 @@ func (b BrokerMap) AboveMean(d float64, f func() float64) []int { return ids } +// BelowMeanFn returns a BrokerFilterFn that filters brokers that are below the +// mean by d percent (0.00 < d). The mean type is provided as a function f. +func BelowMeanFn(d float64, f func() float64) BrokerFilterFn { + m := f() + return func(b *Broker) bool { return (m-b.StorageFree)/m > d } +} + // BelowMean returns a sorted []int of broker IDs that are below the mean // by d percent (0.00 < d). The mean type is provided as a function f. func (b BrokerMap) BelowMean(d float64, f func() float64) []int { - m := f() var ids []int if d <= 0.00 { return ids } - for _, br := range b { - if br.ID == StubBrokerID { - continue - } + filtered := b.Filter(BelowMeanFn(d, f)) - if (m-br.StorageFree)/m > d { - ids = append(ids, br.ID) - } + for _, br := range filtered { + ids = append(ids, br.ID) } sort.Ints(ids) From 531f7d585b090d75847bc2ba665ccb0177acad0d Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Thu, 2 Dec 2021 14:55:44 +0100 Subject: [PATCH 02/18] [kafkazk] use predefined filters --- cmd/topicmappr/commands/output.go | 26 +---------------- cmd/topicmappr/commands/rebuild_steps.go | 6 ++-- kafkazk/brokers.go | 4 ++- kafkazk/brokers_test.go | 3 +- kafkazk/constraints.go | 9 +----- kafkazk/partitions.go | 36 +++++++++++++----------- 6 files changed, 28 insertions(+), 56 deletions(-) diff --git a/cmd/topicmappr/commands/output.go b/cmd/topicmappr/commands/output.go index 325f7c0..435c0e4 100644 --- a/cmd/topicmappr/commands/output.go +++ b/cmd/topicmappr/commands/output.go @@ -124,31 +124,7 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM // input and wasn't marked for replacement (generally, users are doing storage placements // particularly to balance out the storage of the input broker list). - // Filter function for brokers where the Replaced field is false. - nonReplaced := func(b *kafkazk.Broker) bool { - if b.Replace { - return false - } - return true - } - - // Get all IDs in PartitionMap. - mappedIDs := map[int]struct{}{} - for _, partn := range pm1.Partitions { - for _, id := range partn.Replicas { - mappedIDs[id] = struct{}{} - } - } - - // Filter function for brokers that are in the partition map. - mapped := func(b *kafkazk.Broker) bool { - if _, exist := mappedIDs[b.ID]; exist { - return true - } - return false - } - - mb1, mb2 := bm1.Filter(mapped), bm2.Filter(nonReplaced) + mb1, mb2 := bm1.Filter(pm1.BrokersIn()), bm2.Filter(kafkazk.NotReplacedBrokersFn) // Range before/after. r1, r2 := mb1.StorageRange(), mb2.StorageRange() diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index c0ab672..8dd2508 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -217,8 +217,7 @@ func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.Partitio // If the storage placement strategy is being used, // update the broker StorageFree values. if placement == "storage" { - allBrokers := func(b *kafkazk.Broker) bool { return true } - err := rebuildParams.BM.SubStorage(pm, pmm, allBrokers) + err := rebuildParams.BM.SubStorage(pm, pmm, kafkazk.AllBrokersFn) if err != nil { fmt.Println(err) os.Exit(1) @@ -231,8 +230,7 @@ func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.Partitio // Update the StorageFree only on brokers marked for replacement. if placement == "storage" { - replacedBrokers := func(b *kafkazk.Broker) bool { return b.Replace } - err := rebuildParams.BM.SubStorage(pm, pmm, replacedBrokers) + err := rebuildParams.BM.SubStorage(pm, pmm, kafkazk.ReplacedBrokersFn) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/kafkazk/brokers.go b/kafkazk/brokers.go index e181273..2f3a671 100644 --- a/kafkazk/brokers.go +++ b/kafkazk/brokers.go @@ -160,6 +160,8 @@ type BrokerFilterFn func(*Broker) bool // AllBrokersFn returns all brokers. var AllBrokersFn BrokerFilterFn = func(b *Broker) bool { return true } +var NotReplacedBrokersFn = func(b *Broker) bool { return !b.Replace } +var ReplacedBrokersFn = func(b *Broker) bool { return b.Replace } // SortPseudoShuffle takes a BrokerList and performs a sort by count. // For each sequence of brokers with equal counts, the sub-slice is @@ -343,7 +345,7 @@ func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan str // SubStorageAll takes a PartitionMap, PartitionMetaMap, and a function. For all // brokers that return true as an input to function f, the size of all partitions // held is added back to the broker StorageFree value. -func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f func(*Broker) bool) error { +func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f BrokerFilterFn) error { // Get the size of each partition. for _, partn := range pm.Partitions { size, err := pmm.Size(partn) diff --git a/kafkazk/brokers_test.go b/kafkazk/brokers_test.go index 95110f2..1e96512 100644 --- a/kafkazk/brokers_test.go +++ b/kafkazk/brokers_test.go @@ -270,8 +270,7 @@ func TestSubStorageReplacements(t *testing.T) { bm[1003].Replace = true - replacedBrokers := func(b *Broker) bool { return b.Replace } - err := bm.SubStorage(pm, pmm, replacedBrokers) + err := bm.SubStorage(pm, pmm, ReplacedBrokersFn) if err != nil { t.Fatal(err) } diff --git a/kafkazk/constraints.go b/kafkazk/constraints.go index 4a1851c..e4bef8f 100644 --- a/kafkazk/constraints.go +++ b/kafkazk/constraints.go @@ -125,14 +125,7 @@ func (c *Constraints) Add(b *Broker) { func (c *Constraints) MergeConstraints(bl BrokerList) { // Don't merge in attributes // from nodes that will be removed. - var f BrokerFilterFn = func(b *Broker) bool { - if b.Replace { - return false - } - return true - } - - for _, b := range bl.Filter(f) { + for _, b := range bl.Filter(NotReplacedBrokersFn) { if b.Locality != "" { c.locality[b.Locality] = true } diff --git a/kafkazk/partitions.go b/kafkazk/partitions.go index 57d530c..fc52cde 100644 --- a/kafkazk/partitions.go +++ b/kafkazk/partitions.go @@ -267,6 +267,24 @@ func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error) { return newMap, errs } +// BrokersIn returns a BrokerFilterFn that filters for brokers in the +// PartitionMap +func (pm PartitionMap) BrokersIn() BrokerFilterFn { + mappedIDs := map[int]struct{}{} + for _, partn := range pm.Partitions { + for _, id := range partn.Replicas { + mappedIDs[id] = struct{}{} + } + } + + return func(b *Broker) bool { + if _, exist := mappedIDs[b.ID]; exist { + return true + } + return false + } +} + // placeByPosition builds a PartitionMap by doing placements for all // partitions, one broker index at a time. For instance, if all partitions // required a broker set length of 3 (aka a replication factor of 3), we'd @@ -279,14 +297,7 @@ func placeByPosition(params RebuildParams) (*PartitionMap, []error) { // We need a filtered list for usage sorting and exclusion // of nodes marked for removal. - f := func(b *Broker) bool { - if b.Replace { - return false - } - return true - } - - bl := params.BM.Filter(f).List() + bl := params.BM.Filter(NotReplacedBrokersFn).List() var errs []error var pass int @@ -421,14 +432,7 @@ func placeByPartition(params RebuildParams) (*PartitionMap, []error) { // We need a filtered list for usage sorting and exclusion // of nodes marked for removal. - f := func(b *Broker) bool { - if b.Replace { - return false - } - return true - } - - bl := params.BM.Filter(f).List() + bl := params.BM.Filter(NotReplacedBrokersFn).List() var errs []error From c96e85e8efa86b7f07a1121ba504a6b51f3aa448 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Tue, 11 Jan 2022 21:28:56 +0100 Subject: [PATCH 03/18] [tm] make topicRegex private --- cmd/topicmappr/commands/config.go | 8 ++++---- cmd/topicmappr/commands/rebuild.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/topicmappr/commands/config.go b/cmd/topicmappr/commands/config.go index 6d2aa67..30b96bb 100644 --- a/cmd/topicmappr/commands/config.go +++ b/cmd/topicmappr/commands/config.go @@ -44,18 +44,18 @@ func bootstrap(cmd *cobra.Command) { // Populate topic include / exclude regex. if include, _ := cmd.Flags().GetString("topics"); include != "" { - Config.topics = TopicRegex(include) + Config.topics = topicRegex(include) } if exclude, _ := cmd.Flags().GetString("topics-exclude"); exclude != "" { - Config.topicsExclude = TopicRegex(exclude) + Config.topicsExclude = topicRegex(exclude) } } -// TopicRegex takes a string of csv values and returns a []*regexp.Regexp. +// topicRegex takes a string of csv values and returns a []*regexp.Regexp. // The values are either a string literal and become ^value$ or are regex and // compiled then added. -func TopicRegex(s string) []*regexp.Regexp { +func topicRegex(s string) []*regexp.Regexp { var out []*regexp.Regexp // Update string literals to ^value$ regex. diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 6cd925c..dfe6315 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -103,7 +103,7 @@ func rebuild(cmd *cobra.Command, _ []string) { var evacTopics []string var err error if let != "" { - evacTopics, err = zk.GetTopics(TopicRegex(let)) + evacTopics, err = zk.GetTopics(topicRegex(let)) if err != nil { fmt.Println(err) os.Exit(1) From cc0e3629dc972abd4c8b97bac2b229ba49b33387 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Tue, 11 Jan 2022 21:30:30 +0100 Subject: [PATCH 04/18] [tm] move and rename evacuateLeadership --- .../commands/evac_leadership_test.go | 10 ++-- cmd/topicmappr/commands/reassignments.go | 56 ------------------- cmd/topicmappr/commands/rebuild.go | 2 +- cmd/topicmappr/commands/rebuild_steps.go | 54 ++++++++++++++++++ 4 files changed, 60 insertions(+), 62 deletions(-) diff --git a/cmd/topicmappr/commands/evac_leadership_test.go b/cmd/topicmappr/commands/evac_leadership_test.go index c524ff7..a1f344d 100644 --- a/cmd/topicmappr/commands/evac_leadership_test.go +++ b/cmd/topicmappr/commands/evac_leadership_test.go @@ -41,7 +41,7 @@ var pMapIn = kafkazk.PartitionMap{ func TestRemoveProblemBroker(t *testing.T) { problemBrokerId := 10001 - pMapOut := EvacLeadership(pMapIn, []int{problemBrokerId}, []string{topic}) + pMapOut := evacuateLeadership(pMapIn, []int{problemBrokerId}, []string{topic}) for _, partition := range pMapOut.Partitions { if partition.Replicas[0] == problemBrokerId { @@ -53,7 +53,7 @@ func TestRemoveProblemBroker(t *testing.T) { func TestEvacTwoProblemBrokers(t *testing.T) { problemBrokers := []int{10001, 10002} - pMapOut := EvacLeadership(pMapIn, problemBrokers, []string{topic}) + pMapOut := evacuateLeadership(pMapIn, problemBrokers, []string{topic}) for _, partition := range pMapOut.Partitions { if partition.Replicas[0] == problemBrokers[0] || partition.Replicas[0] == problemBrokers[1] { @@ -63,7 +63,7 @@ func TestEvacTwoProblemBrokers(t *testing.T) { } func TestNoMatchingTopicToEvac(t *testing.T) { - pMapOut := EvacLeadership(pMapIn, []int{10001}, []string{"some other topic"}) + pMapOut := evacuateLeadership(pMapIn, []int{10001}, []string{"some other topic"}) for i, partition := range pMapOut.Partitions { for j, broker := range partition.Replicas { @@ -81,7 +81,7 @@ func TestNoMatchingTopicToEvac(t *testing.T) { // problemBrokers[10002] = &kafkazk.Broker{} // problemBrokers[10003] = &kafkazk.Broker{} // -// EvacLeadership(pMapIn, problemBrokers) +// evacuateLeadership(pMapIn, problemBrokers) // -// t.Errorf("EvacLeadership should have errored out at this point.") +// t.Errorf("evacuateLeadership should have errored out at this point.") //} diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index 627bada..d0d9ef5 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -1,8 +1,6 @@ package commands import ( - "fmt" - "os" "sync" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -131,57 +129,3 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re return results } - -// EvacLeadership For the given set of topics, makes sure that the given brokers are not -// leaders of any partitions. If we have any partitions that only have replicas from the -// evac broker list, we will fail. -func EvacLeadership(partitionMapIn kafkazk.PartitionMap, evacBrokers []int, evacTopics []string) *kafkazk.PartitionMap { - // evacuation algo starts here - partitionMapOut := partitionMapIn.Copy() - - // make a lookup map of topics - topicsMap := map[string]struct{}{} - for _, topic := range evacTopics { - topicsMap[topic] = struct{}{} - } - - // make a lookup map of topics - brokersMap := map[int]struct{}{} - for _, b := range evacBrokers { - brokersMap[b] = struct{}{} - } - - // TODO What if we only want to evacuate a subset of partitions? - // For now, problem brokers is the bigger use case. - - // swap leadership for all broker/partition/topic combos - for i, p := range partitionMapIn.Partitions { - // check the topic is one of the target topics - if _, correctTopic := topicsMap[p.Topic]; !correctTopic { - continue - } - - // check the leader to see if its one of the evac brokers - if _, contains := brokersMap[p.Replicas[0]]; !contains { - continue - } - - for j, replica := range p.Replicas { - // If one of the replicas is not being leadership evacuated, use that one and swap. - if _, contains := brokersMap[replica]; !contains { - newLeader := p.Replicas[j] - partitionMapOut.Partitions[i].Replicas[j] = p.Replicas[0] - partitionMapOut.Partitions[i].Replicas[0] = newLeader - break - } - - // If we've tried every replica, but they are all being leader evac'd. - if replica == p.Replicas[len(p.Replicas)-1] { - fmt.Println("No replicas available to evacuate leadership to. All replicas present in EvacLeadership broker list.") - os.Exit(1) - } - } - } - - return partitionMapOut -} diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index dfe6315..1af627d 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -212,7 +212,7 @@ func rebuild(cmd *cobra.Command, _ []string) { phasedMap = phasedReassignment(originalMap, partitionMapOut) } - partitionMapOut = EvacLeadership(*partitionMapOut, evacBrokers, evacTopics) + partitionMapOut = evacuateLeadership(*partitionMapOut, evacBrokers, evacTopics) // Print map change results. printMapChanges(originalMap, partitionMapOut) diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index 8dd2508..c2ed5b2 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -284,3 +284,57 @@ func notInReplicaSet(id int, rs []int) bool { return true } + +// evacuateLeadership For the given set of topics, makes sure that the given brokers are not +// leaders of any partitions. If we have any partitions that only have replicas from the +// evac broker list, we will fail. +func evacuateLeadership(partitionMapIn kafkazk.PartitionMap, evacBrokers []int, evacTopics []string) *kafkazk.PartitionMap { + // evacuation algo starts here + partitionMapOut := partitionMapIn.Copy() + + // make a lookup map of topics + topicsMap := map[string]struct{}{} + for _, topic := range evacTopics { + topicsMap[topic] = struct{}{} + } + + // make a lookup map of topics + brokersMap := map[int]struct{}{} + for _, b := range evacBrokers { + brokersMap[b] = struct{}{} + } + + // TODO What if we only want to evacuate a subset of partitions? + // For now, problem brokers is the bigger use case. + + // swap leadership for all broker/partition/topic combos + for i, p := range partitionMapIn.Partitions { + // check the topic is one of the target topics + if _, correctTopic := topicsMap[p.Topic]; !correctTopic { + continue + } + + // check the leader to see if its one of the evac brokers + if _, contains := brokersMap[p.Replicas[0]]; !contains { + continue + } + + for j, replica := range p.Replicas { + // If one of the replicas is not being leadership evacuated, use that one and swap. + if _, contains := brokersMap[replica]; !contains { + newLeader := p.Replicas[j] + partitionMapOut.Partitions[i].Replicas[j] = p.Replicas[0] + partitionMapOut.Partitions[i].Replicas[0] = newLeader + break + } + + // If we've tried every replica, but they are all being leader evac'd. + if replica == p.Replicas[len(p.Replicas)-1] { + fmt.Println("No replicas available to evacuate leadership to. All replicas present in EvacLeadership broker list.") + os.Exit(1) + } + } + } + + return partitionMapOut +} From 5d4b970a894bd9c2e1b1fbc9a435754338552f7c Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Tue, 11 Jan 2022 21:42:32 +0100 Subject: [PATCH 05/18] [tm] make error message consistent --- cmd/topicmappr/commands/rebuild_steps.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index c2ed5b2..8264b9d 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -330,7 +330,7 @@ func evacuateLeadership(partitionMapIn kafkazk.PartitionMap, evacBrokers []int, // If we've tried every replica, but they are all being leader evac'd. if replica == p.Replicas[len(p.Replicas)-1] { - fmt.Println("No replicas available to evacuate leadership to. All replicas present in EvacLeadership broker list.") + fmt.Println("[ERROR] trying to evict all replicas at once") os.Exit(1) } } From ff4a6bdcf68425628f123e832b3bebe130dae929 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Wed, 1 Dec 2021 09:15:57 +0100 Subject: [PATCH 06/18] [tm] move zk-metrics-prefix to root command --- cmd/topicmappr/commands/rebalance.go | 1 - cmd/topicmappr/commands/rebuild.go | 1 - cmd/topicmappr/commands/root.go | 1 + cmd/topicmappr/commands/scale.go | 1 - 4 files changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index d0f304c..790530a 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -33,7 +33,6 @@ func init() { rebalanceCmd.Flags().Int("partition-size-threshold", 512, "Size in megabytes where partitions below this value will not be moved in a rebalance") rebalanceCmd.Flags().Bool("locality-scoped", false, "Ensure that all partition movements are scoped by rack.id") rebalanceCmd.Flags().Bool("verbose", false, "Verbose output") - rebalanceCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") rebalanceCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes)") rebalanceCmd.Flags().Bool("optimize-leadership", false, "Rebalance all broker leader/follower ratios") diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 1af627d..9aa13c7 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -38,7 +38,6 @@ func init() { rebuildCmd.Flags().String("optimize", "distribution", "Optimization priority for the storage placement strategy: [distribution, storage]") rebuildCmd.Flags().Float64("partition-size-factor", 1.0, "Factor by which to multiply partition sizes when using storage placement") rebuildCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to ('-1' for all currently mapped brokers, '-2' for all brokers in cluster)") - rebuildCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics (when using storage placement)") rebuildCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes) (when using storage placement)") rebuildCmd.Flags().Bool("skip-no-ops", false, "Skip no-op partition assigments") rebuildCmd.Flags().Bool("optimize-leadership", false, "Rebalance all broker leader/follower ratios") diff --git a/cmd/topicmappr/commands/root.go b/cmd/topicmappr/commands/root.go index 6ddc5da..ed17c02 100644 --- a/cmd/topicmappr/commands/root.go +++ b/cmd/topicmappr/commands/root.go @@ -25,5 +25,6 @@ func Execute() { func init() { rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string") rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)") + rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") rootCmd.PersistentFlags().Bool("ignore-warns", false, "Produce a map even if warnings are encountered") } diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index befc87d..c16c9bf 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -30,7 +30,6 @@ func init() { scaleCmd.Flags().Int("partition-size-threshold", 512, "Size in megabytes where partitions below this value will not be moved in a scale") scaleCmd.Flags().Bool("locality-scoped", false, "Ensure that all partition movements are scoped by rack.id") scaleCmd.Flags().Bool("verbose", false, "Verbose output") - scaleCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") scaleCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes)") scaleCmd.Flags().Bool("optimize-leadership", false, "Scale all broker leader/follower ratios") From 10d0d9b58610f3cc6156ab37610e5c7be6e10939 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Fri, 26 Nov 2021 14:54:48 +0100 Subject: [PATCH 07/18] [tm] remove cobra from metadata utils --- cmd/topicmappr/commands/metadata.go | 14 +++++--------- cmd/topicmappr/commands/rebalance.go | 10 ++++++---- cmd/topicmappr/commands/rebuild.go | 9 +++++---- cmd/topicmappr/commands/scale.go | 9 +++++---- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index d029e4e..842bb77 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -7,22 +7,18 @@ import ( "time" "github.com/DataDog/kafka-kit/v3/kafkazk" - - "github.com/spf13/cobra" ) // checkMetaAge checks the age of the stored partition and broker storage // metrics data against the tolerated metrics age parameter. -func checkMetaAge(cmd *cobra.Command, zk kafkazk.Handler) { +func checkMetaAge(zk kafkazk.Handler, maxAge int) { age, err := zk.MaxMetaAge() if err != nil { fmt.Printf("Error fetching metrics metadata: %s\n", err) os.Exit(1) } - tol, _ := cmd.Flags().GetInt("metrics-age") - - if age > time.Duration(tol)*time.Minute { + if age > time.Duration(maxAge)*time.Minute { fmt.Printf("Metrics metadata is older than allowed: %s\n", age) os.Exit(1) } @@ -31,7 +27,7 @@ func checkMetaAge(cmd *cobra.Command, zk kafkazk.Handler) { // getBrokerMeta returns a map of brokers and broker metadata for those // registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper // (via an external mechanism*) can be merged into the metadata. -func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.BrokerMetaMap { +func getBrokerMeta(zk kafkazk.Handler, m bool) kafkazk.BrokerMetaMap { brokerMeta, errs := zk.GetAllBrokerMeta(m) // If no data is returned, report and exit. Otherwise, it's possible that // complete data for a few brokers wasn't returned. We check in subsequent @@ -49,7 +45,7 @@ func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.Broke // ensureBrokerMetrics takes a map of reference brokers and a map of discovered // broker metadata. Any non-missing brokers in the broker map must be present // in the broker metadata map and have a non-true MetricsIncomplete value. -func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) { +func ensureBrokerMetrics(bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) { var e bool for id, b := range bm { // Missing brokers won't be found in the brokerMeta. @@ -67,7 +63,7 @@ func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.B // getPartitionMeta returns a map of topic, partition metadata persisted in // ZooKeeper (via an external mechanism*). This is primarily partition size // metrics data used for the storage placement strategy. -func getPartitionMeta(cmd *cobra.Command, zk kafkazk.Handler) kafkazk.PartitionMetaMap { +func getPartitionMeta(zk kafkazk.Handler) kafkazk.PartitionMetaMap { partitionMeta, err := zk.GetAllPartitionMeta() if err != nil { fmt.Println(err) diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 790530a..44cfefc 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -53,10 +53,12 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + // Get broker and partition metadata. - checkMetaAge(cmd, zk) - brokerMeta := getBrokerMeta(cmd, zk, true) - partitionMeta := getPartitionMeta(cmd, zk) + checkMetaAge(zk, maxMetadataAge) + brokerMeta := getBrokerMeta(zk, true) + partitionMeta := getPartitionMeta(zk) // Get the current partition map. partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) @@ -174,7 +176,7 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, } // Check if any referenced brokers are marked as having missing/partial metrics data. - ensureBrokerMetrics(cmd, brokers, bm) + ensureBrokerMetrics(brokers, bm) switch { case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 9aa13c7..b0feb3e 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -131,19 +131,20 @@ func rebuild(cmd *cobra.Command, _ []string) { // Fetch broker metadata. var withMetrics bool if cmd.Flag("placement").Value.String() == "storage" { - checkMetaAge(cmd, zk) + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + checkMetaAge(zk, maxMetadataAge) withMetrics = true } var brokerMeta kafkazk.BrokerMetaMap if m, _ := cmd.Flags().GetBool("use-meta"); m { - brokerMeta = getBrokerMeta(cmd, zk, withMetrics) + brokerMeta = getBrokerMeta(zk, withMetrics) } // Fetch partition metadata. var partitionMeta kafkazk.PartitionMetaMap if cmd.Flag("placement").Value.String() == "storage" { - partitionMeta = getPartitionMeta(cmd, zk) + partitionMeta = getPartitionMeta(zk) } // Build a partition map either from literal map text input or by fetching the @@ -168,7 +169,7 @@ func rebuild(cmd *cobra.Command, _ []string) { // Check if any referenced brokers are marked as having // missing/partial metrics data. if m, _ := cmd.Flags().GetBool("use-meta"); m { - ensureBrokerMetrics(cmd, brokers, brokerMeta) + ensureBrokerMetrics(brokers, brokerMeta) } // Create substitution affinities. diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index c16c9bf..7122150 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -51,9 +51,10 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() // Get broker and partition metadata. - checkMetaAge(cmd, zk) - brokerMeta := getBrokerMeta(cmd, zk, true) - partitionMeta := getPartitionMeta(cmd, zk) + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + checkMetaAge(zk, maxMetadataAge) + brokerMeta := getBrokerMeta(zk, true) + partitionMeta := getPartitionMeta(zk) // Get the current partition map. partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) @@ -173,7 +174,7 @@ func validateBrokersForScale(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm k } // Check if any referenced brokers are marked as having missing/partial metrics data. - ensureBrokerMetrics(cmd, brokers, bm) + ensureBrokerMetrics(brokers, bm) switch { case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: From 9e43c6269a673eeaea973210f03030879a04c054 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Fri, 26 Nov 2021 15:38:45 +0100 Subject: [PATCH 08/18] [tm] remove cobra from initZookeeper --- cmd/topicmappr/commands/config.go | 10 ++++------ cmd/topicmappr/commands/rebalance.go | 5 ++++- cmd/topicmappr/commands/rebuild.go | 5 ++++- cmd/topicmappr/commands/scale.go | 5 ++++- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/cmd/topicmappr/commands/config.go b/cmd/topicmappr/commands/config.go index 30b96bb..e61b03b 100644 --- a/cmd/topicmappr/commands/config.go +++ b/cmd/topicmappr/commands/config.go @@ -88,23 +88,21 @@ func topicRegex(s string) []*regexp.Regexp { // topic discovery` via ZooKeeper. // - that the --placement flag was set to 'storage', which expects // metrics metadata to be stored in ZooKeeper. -func initZooKeeper(cmd *cobra.Command) (kafkazk.Handler, error) { +func initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix string) (kafkazk.Handler, error) { // Suppress underlying ZK client noise. log.SetOutput(ioutil.Discard) - zkAddr := cmd.Parent().Flag("zk-addr").Value.String() - timeout := 250 * time.Millisecond - zk, err := kafkazk.NewHandler(&kafkazk.Config{ Connect: zkAddr, - Prefix: cmd.Parent().Flag("zk-prefix").Value.String(), - MetricsPrefix: cmd.Flag("zk-metrics-prefix").Value.String(), + Prefix: kafkaPrefix, + MetricsPrefix: metricsPrefix, }) if err != nil { return nil, fmt.Errorf("Error connecting to ZooKeeper: %s", err) } + timeout := 250 * time.Millisecond time.Sleep(timeout) if !zk.Ready() { diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 44cfefc..3ab71df 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -45,7 +45,10 @@ func rebalance(cmd *cobra.Command, _ []string) { bootstrap(cmd) // ZooKeeper init. - zk, err := initZooKeeper(cmd) + zkAddr := cmd.Parent().Flag("zk-addr").Value.String() + kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String() + metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String() + zk, err := initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index b0feb3e..cef4331 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -90,7 +90,10 @@ func rebuild(cmd *cobra.Command, _ []string) { var zk kafkazk.Handler if m || len(Config.topics) > 0 || p == "storage" { var err error - zk, err = initZooKeeper(cmd) + zkAddr := cmd.Parent().Flag("zk-addr").Value.String() + kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String() + metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String() + zk, err = initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 7122150..0a24199 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -42,7 +42,10 @@ func scale(cmd *cobra.Command, _ []string) { bootstrap(cmd) // ZooKeeper init. - zk, err := initZooKeeper(cmd) + zkAddr := cmd.Parent().Flag("zk-addr").Value.String() + kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String() + metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String() + zk, err := initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix) if err != nil { fmt.Println(err) os.Exit(1) From bc3428f0aa44b5555663b4f2f36e30f37162114d Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Fri, 26 Nov 2021 20:01:56 +0100 Subject: [PATCH 09/18] [tm] handle metadata errors one level up --- cmd/topicmappr/commands/metadata.go | 56 +++++++----------------- cmd/topicmappr/commands/rebalance.go | 36 +++++++++++---- cmd/topicmappr/commands/rebuild.go | 25 +++++++++-- cmd/topicmappr/commands/rebuild_steps.go | 5 ++- cmd/topicmappr/commands/scale.go | 33 +++++++++++--- 5 files changed, 95 insertions(+), 60 deletions(-) diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index 842bb77..bd28a72 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -2,7 +2,6 @@ package commands import ( "fmt" - "os" "regexp" "time" @@ -11,80 +10,55 @@ import ( // checkMetaAge checks the age of the stored partition and broker storage // metrics data against the tolerated metrics age parameter. -func checkMetaAge(zk kafkazk.Handler, maxAge int) { +func checkMetaAge(zk kafkazk.Handler, maxAge int) error { age, err := zk.MaxMetaAge() if err != nil { - fmt.Printf("Error fetching metrics metadata: %s\n", err) - os.Exit(1) + return fmt.Errorf("Error fetching metrics metadata: %s\n", err) } if age > time.Duration(maxAge)*time.Minute { - fmt.Printf("Metrics metadata is older than allowed: %s\n", age) - os.Exit(1) + return fmt.Errorf("Metrics metadata is older than allowed: %s\n", age) } + return nil } // getBrokerMeta returns a map of brokers and broker metadata for those // registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper // (via an external mechanism*) can be merged into the metadata. -func getBrokerMeta(zk kafkazk.Handler, m bool) kafkazk.BrokerMetaMap { - brokerMeta, errs := zk.GetAllBrokerMeta(m) - // If no data is returned, report and exit. Otherwise, it's possible that - // complete data for a few brokers wasn't returned. We check in subsequent - // steps as to whether any brokers that matter are missing metrics. - if errs != nil && brokerMeta == nil { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - - return brokerMeta +func getBrokerMeta(zk kafkazk.Handler, m bool) (kafkazk.BrokerMetaMap, []error) { + return zk.GetAllBrokerMeta(m) } // ensureBrokerMetrics takes a map of reference brokers and a map of discovered // broker metadata. Any non-missing brokers in the broker map must be present // in the broker metadata map and have a non-true MetricsIncomplete value. -func ensureBrokerMetrics(bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) { - var e bool +func ensureBrokerMetrics(bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) []error { + errs := []error{} for id, b := range bm { // Missing brokers won't be found in the brokerMeta. if !b.Missing && id != kafkazk.StubBrokerID && bmm[id].MetricsIncomplete { - e = true - fmt.Printf("Metrics not found for broker %d\n", id) + errs = append(errs, fmt.Errorf("Metrics not found for broker %d\n", id)) } } - - if e { - os.Exit(1) - } + return errs } // getPartitionMeta returns a map of topic, partition metadata persisted in // ZooKeeper (via an external mechanism*). This is primarily partition size // metrics data used for the storage placement strategy. -func getPartitionMeta(zk kafkazk.Handler) kafkazk.PartitionMetaMap { - partitionMeta, err := zk.GetAllPartitionMeta() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - return partitionMeta +func getPartitionMeta(zk kafkazk.Handler) (kafkazk.PartitionMetaMap, error) { + return zk.GetAllPartitionMeta() } // stripPendingDeletes takes a partition map and zk handler. It looks up any // topics in a pending delete state and removes them from the provided partition // map, returning a list of topics removed. -func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) []string { +func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) ([]string, error) { // Get pending deletions. pd, err := zk.GetPendingDeletion() - if err != nil { - fmt.Println("Error fetching topics pending deletion") - } if len(pd) == 0 { - return []string{} + return []string{}, err } // Convert to a series of literal regex. @@ -95,7 +69,7 @@ func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) []string } // Update the PartitionMap and return a list of removed topic names. - return removeTopics(pm, re) + return removeTopics(pm, re), err } // removeTopics takes a PartitionMap and []*regexp.Regexp of topic name patters. diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 3ab71df..9ceca56 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -56,12 +56,24 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() - maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - // Get broker and partition metadata. - checkMetaAge(zk, maxMetadataAge) - brokerMeta := getBrokerMeta(zk, true) - partitionMeta := getPartitionMeta(zk) + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + if err := checkMetaAge(zk, maxMetadataAge); err != nil { + fmt.Println(err) + os.Exit(1) + } + brokerMeta, errs := getBrokerMeta(zk, true) + if errs != nil && brokerMeta == nil { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + partitionMeta, err := getPartitionMeta(zk) + if err != nil { + fmt.Println(err) + os.Exit(1) + } // Get the current partition map. partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) @@ -71,7 +83,10 @@ func rebalance(cmd *cobra.Command, _ []string) { } // Exclude any topics that are pending deletion. - pending := stripPendingDeletes(partitionMapIn, zk) + pending, err := stripPendingDeletes(partitionMapIn, zk) + if err != nil { + fmt.Println("Error fetching topics pending deletion") + } // Exclude any explicit exclusions. excluded := removeTopics(partitionMapIn, Config.topicsExclude) @@ -150,7 +165,7 @@ func rebalance(cmd *cobra.Command, _ []string) { printMapChanges(partitionMapIn, partitionMapOut) // Print broker assignment statistics. - errs := printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) + errs = printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) // Handle errors that are possible to be overridden by the user (aka 'WARN' // in topicmappr console output). @@ -179,7 +194,12 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, } // Check if any referenced brokers are marked as having missing/partial metrics data. - ensureBrokerMetrics(brokers, bm) + if errs := ensureBrokerMetrics(brokers, bm); len(errs) > 0 { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } switch { case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index cef4331..9284f8c 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -135,19 +135,31 @@ func rebuild(cmd *cobra.Command, _ []string) { var withMetrics bool if cmd.Flag("placement").Value.String() == "storage" { maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - checkMetaAge(zk, maxMetadataAge) + if err := checkMetaAge(zk, maxMetadataAge); err != nil { + fmt.Println(err) + os.Exit(1) + } withMetrics = true } var brokerMeta kafkazk.BrokerMetaMap + var errs []error if m, _ := cmd.Flags().GetBool("use-meta"); m { - brokerMeta = getBrokerMeta(zk, withMetrics) + if brokerMeta, errs = getBrokerMeta(zk, withMetrics); errs != nil && brokerMeta == nil { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } } // Fetch partition metadata. var partitionMeta kafkazk.PartitionMetaMap if cmd.Flag("placement").Value.String() == "storage" { - partitionMeta = getPartitionMeta(zk) + if partitionMeta, err = getPartitionMeta(zk); err != nil { + fmt.Println(err) + os.Exit(1) + } } // Build a partition map either from literal map text input or by fetching the @@ -172,7 +184,12 @@ func rebuild(cmd *cobra.Command, _ []string) { // Check if any referenced brokers are marked as having // missing/partial metrics data. if m, _ := cmd.Flags().GetBool("use-meta"); m { - ensureBrokerMetrics(brokers, brokerMeta) + if errs := ensureBrokerMetrics(brokers, brokerMeta); len(errs) > 0 { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } } // Create substitution affinities. diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index 8264b9d..bbc85ad 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -44,7 +44,10 @@ func getPartitionMap(cmd *cobra.Command, zk kafkazk.Handler) (*kafkazk.Partition } // Exclude any topics that are pending deletion. - pd := stripPendingDeletes(pm, zk) + pd, err := stripPendingDeletes(pm, zk) + if err != nil { + fmt.Println("Error fetching topics pending deletion") + } // Exclude topics explicitly listed. et := removeTopics(pm, Config.topicsExclude) diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 0a24199..7307d7f 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -55,9 +55,22 @@ func scale(cmd *cobra.Command, _ []string) { // Get broker and partition metadata. maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - checkMetaAge(zk, maxMetadataAge) - brokerMeta := getBrokerMeta(zk, true) - partitionMeta := getPartitionMeta(zk) + if err := checkMetaAge(zk, maxMetadataAge); err != nil { + fmt.Println(err) + os.Exit(1) + } + brokerMeta, errs := getBrokerMeta(zk, true) + if errs != nil && brokerMeta == nil { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + partitionMeta, err := getPartitionMeta(zk) + if err != nil { + fmt.Println(err) + os.Exit(1) + } // Get the current partition map. partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) @@ -67,7 +80,10 @@ func scale(cmd *cobra.Command, _ []string) { } // Exclude any topics that are pending deletion. - pending := stripPendingDeletes(partitionMapIn, zk) + pending, err := stripPendingDeletes(partitionMapIn, zk) + if err != nil { + fmt.Println("Error fetching topics pending deletion") + } // Exclude any explicit exclusions. excluded := removeTopics(partitionMapIn, Config.topicsExclude) @@ -146,7 +162,7 @@ func scale(cmd *cobra.Command, _ []string) { printMapChanges(partitionMapIn, partitionMapOut) // Print broker assignment statistics. - errs := printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) + errs = printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) // Handle errors that are possible // to be overridden by the user (aka @@ -177,7 +193,12 @@ func validateBrokersForScale(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm k } // Check if any referenced brokers are marked as having missing/partial metrics data. - ensureBrokerMetrics(brokers, bm) + if errs := ensureBrokerMetrics(brokers, bm); len(errs) > 0 { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } switch { case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: From 4f54f850b932af0ea42ad9ea26abab8845447caa Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Fri, 26 Nov 2021 20:12:30 +0100 Subject: [PATCH 10/18] [tm] remove duplicate sort PartitionMap.Topics() is already sorted. --- cmd/topicmappr/commands/output.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/topicmappr/commands/output.go b/cmd/topicmappr/commands/output.go index 435c0e4..318336e 100644 --- a/cmd/topicmappr/commands/output.go +++ b/cmd/topicmappr/commands/output.go @@ -23,7 +23,6 @@ func (e errors) Swap(i, j int) { e[i], e[j] = e[j], e[i] } // referenced in the map. func printTopics(pm *kafkazk.PartitionMap) { topics := pm.Topics() - sort.Strings(topics) fmt.Printf("\nTopics:\n") for _, t := range topics { From c02c32da5d385659a58c4add2ddf8031189a916a Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Mon, 29 Nov 2021 12:52:34 +0100 Subject: [PATCH 11/18] [tm] do not pass cmd into writeMaps --- cmd/topicmappr/commands/output.go | 5 +---- cmd/topicmappr/commands/rebalance.go | 4 +++- cmd/topicmappr/commands/rebuild.go | 4 +++- cmd/topicmappr/commands/scale.go | 4 +++- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/cmd/topicmappr/commands/output.go b/cmd/topicmappr/commands/output.go index 318336e..5a233c5 100644 --- a/cmd/topicmappr/commands/output.go +++ b/cmd/topicmappr/commands/output.go @@ -200,7 +200,7 @@ func skipReassignmentNoOps(pm1, pm2 *kafkazk.PartitionMap) (*kafkazk.PartitionMa } // writeMaps takes a PartitionMap and writes out files. -func writeMaps(cmd *cobra.Command, pm *kafkazk.PartitionMap, phasedPM *kafkazk.PartitionMap) { +func writeMaps(outPath, outFile string, pm *kafkazk.PartitionMap, phasedPM *kafkazk.PartitionMap) { if len(pm.Partitions) == 0 { fmt.Println("\nNo partition reassignments, skipping map generation") return @@ -213,9 +213,6 @@ func writeMaps(cmd *cobra.Command, pm *kafkazk.PartitionMap, phasedPM *kafkazk.P phaseSuffix[1] = "-phase2" } - outPath := cmd.Flag("out-path").Value.String() - outFile := cmd.Flag("out-file").Value.String() - // Break map up by topic. tm := map[string]*kafkazk.PartitionMap{} diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 9ceca56..4899cd1 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -175,7 +175,9 @@ func rebalance(cmd *cobra.Command, _ []string) { partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) // Write maps. - writeMaps(cmd, partitionMapOut, nil) + outPath := cmd.Flag("out-path").Value.String() + outFile := cmd.Flag("out-file").Value.String() + writeMaps(outPath, outFile, partitionMapOut, nil) } func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 9284f8c..1b8f0e8 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -248,5 +248,7 @@ func rebuild(cmd *cobra.Command, _ []string) { originalMap, partitionMapOut = skipReassignmentNoOps(originalMap, partitionMapOut) } - writeMaps(cmd, partitionMapOut, phasedMap) + outPath := cmd.Flag("out-path").Value.String() + outFile := cmd.Flag("out-file").Value.String() + writeMaps(outPath, outFile, partitionMapOut, phasedMap) } diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 7307d7f..523a1c2 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -174,7 +174,9 @@ func scale(cmd *cobra.Command, _ []string) { partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) // Write maps. - writeMaps(cmd, partitionMapOut, nil) + outPath := cmd.Flag("out-path").Value.String() + outFile := cmd.Flag("out-file").Value.String() + writeMaps(outPath, outFile, partitionMapOut, nil) } func validateBrokersForScale(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { From f34f8875396b664fd7d5172063384dec01106c42 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Tue, 30 Nov 2021 21:34:28 +0100 Subject: [PATCH 12/18] [tm/rebuild] use params instead of cobra.cmd --- cmd/topicmappr/commands/rebuild.go | 157 +++++++++++++++-------- cmd/topicmappr/commands/rebuild_steps.go | 61 ++++----- 2 files changed, 128 insertions(+), 90 deletions(-) diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 1b8f0e8..3fc8d4b 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -3,6 +3,7 @@ package commands import ( "fmt" "os" + "regexp" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -51,45 +52,104 @@ func init() { rebuildCmd.MarkFlagRequired("brokers") } -func rebuild(cmd *cobra.Command, _ []string) { - // Sanity check params. - t, _ := cmd.Flags().GetString("topics") - ms, _ := cmd.Flags().GetString("map-string") - p := cmd.Flag("placement").Value.String() - o := cmd.Flag("optimize").Value.String() - fr, _ := cmd.Flags().GetBool("force-rebuild") - sa, _ := cmd.Flags().GetBool("sub-affinity") - m, _ := cmd.Flags().GetBool("use-meta") +type rebuildParams struct { + brokers []int + forceRebuild bool + mapString string + maxMetadataAge int + minRackIds int + optimize string + optimizeLeadership bool + partitionSizeFactor float64 + phasedReassignment bool + placement string + replication int + skipNoOps bool + subAffinity bool + topics []*regexp.Regexp + topicsExclude []*regexp.Regexp + useMetadata bool + leaderEvacTopics []*regexp.Regexp + leaderEvacBrokers []int +} +func rebuildParamsFromCmd(cmd *cobra.Command) (c rebuildParams) { + brokers, _ := cmd.Flags().GetString("brokers") + c.brokers = brokerStringToSlice(brokers) + forceRebuild, _ := cmd.Flags().GetBool("force-rebuild") + c.forceRebuild = forceRebuild + mapString, _ := cmd.Flags().GetString("map-string") + c.mapString = mapString + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + c.maxMetadataAge = maxMetadataAge + minRackIds, _ := cmd.Flags().GetInt("min-rack-ids") + c.minRackIds = minRackIds + optimize, _ := cmd.Flags().GetString("optimize") + c.optimize = optimize + optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") + c.optimizeLeadership = optimizeLeadership + partitionSizeFactor, _ := cmd.Flags().GetFloat64("partition-size-factor") + c.partitionSizeFactor = partitionSizeFactor + phasedReassignment, _ := cmd.Flags().GetBool("phased-reassignment") + c.phasedReassignment = phasedReassignment + placement, _ := cmd.Flags().GetString("placement") + c.placement = placement + replication, _ := cmd.Flags().GetInt("replication") + c.replication = replication + skipNoOps, _ := cmd.Flags().GetBool("skip-no-ops") + c.skipNoOps = skipNoOps + subAffinity, _ := cmd.Flags().GetBool("sub-affinity") + c.subAffinity = subAffinity + topics, _ := cmd.Flags().GetString("topics") + c.topics = topicRegex(topics) + topicsExclude, _ := cmd.Flags().GetString("topics-exclude") + c.topicsExclude = topicRegex(topicsExclude) + useMetadata, _ := cmd.Flags().GetBool("use-meta") + c.useMetadata = useMetadata let, _ := cmd.Flags().GetString("leader-evac-topics") + if let != "" { + c.leaderEvacTopics = topicRegex(let) + } leb, _ := cmd.Flags().GetString("leader-evac-brokers") + if leb != "" { + c.leaderEvacBrokers = brokerStringToSlice(leb) + } + return c +} +func (c rebuildParams) validate() error { switch { - case ms == "" && t == "": - fmt.Println("\n[ERROR] must specify either --topics or --map-string") - defaultsAndExit() - case p != "count" && p != "storage": - fmt.Println("\n[ERROR] --placement must be either 'count' or 'storage'") - defaultsAndExit() - case o != "distribution" && o != "storage": - fmt.Println("\n[ERROR] --optimize must be either 'distribution' or 'storage'") - defaultsAndExit() - case !m && p == "storage": - fmt.Println("\n[ERROR] --placement=storage requires --use-meta=true") - defaultsAndExit() - case fr && sa: - fmt.Println("\n[INFO] --force-rebuild disables --sub-affinity") - case (leb != "" || let != "") && (leb == "" || let == ""): - fmt.Println("\n[ERROR] --leader-evac-topics and --leader-evac-brokers must both be specified for leadership evacuation.") - defaultsAndExit() + case c.mapString == "" && len(c.topics) == 0: + return fmt.Errorf("\n[ERROR] must specify either --topics or --map-string") + case c.placement != "count" && c.placement != "storage": + return fmt.Errorf("\n[ERROR] --placement must be either 'count' or 'storage'") + case c.optimize != "distribution" && c.optimize != "storage": + return fmt.Errorf("\n[ERROR] --optimize must be either 'distribution' or 'storage'") + case !c.useMetadata && c.placement == "storage": + return fmt.Errorf("\n[ERROR] --placement=storage requires --use-meta=true") + case c.forceRebuild && c.subAffinity: + return fmt.Errorf("\n[INFO] --force-rebuild disables --sub-affinity") + case (len(c.leaderEvacBrokers) != 0 || len(c.leaderEvacTopics) != 0) && (len(c.leaderEvacBrokers) == 0 || len(c.leaderEvacTopics) == 0): + return fmt.Errorf("\n[ERROR] --leader-evac-topics and --leader-evac-brokers must both be specified for leadership evacuation.") } + return nil +} +func rebuild(cmd *cobra.Command, _ []string) { bootstrap(cmd) + params := rebuildParamsFromCmd(cmd) + err := params.validate() + if err != nil { + fmt.Println(err) + defaultsAndExit() + } + if params.forceRebuild && params.subAffinity { + fmt.Println("\n[INFO] --force-rebuild disables --sub-affinity") + } // ZooKeeper init. var zk kafkazk.Handler - if m || len(Config.topics) > 0 || p == "storage" { - var err error + if params.useMetadata || len(params.topics) > 0 || params.placement == "storage" { zkAddr := cmd.Parent().Flag("zk-addr").Value.String() kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String() metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String() @@ -103,20 +163,14 @@ func rebuild(cmd *cobra.Command, _ []string) { // In addition to the global topic regex, we have leader-evac topic regex as well. var evacTopics []string - var err error - if let != "" { - evacTopics, err = zk.GetTopics(topicRegex(let)) + if len(params.leaderEvacTopics) != 0 { + evacTopics, err = zk.GetTopics(params.leaderEvacTopics) if err != nil { fmt.Println(err) os.Exit(1) } } - var evacBrokers []int - if leb != "" { - evacBrokers = brokerStringToSlice(leb) - } - // General flow: // 1) A PartitionMap is formed (either unmarshaled from the literal // map input via --rebuild-map or generated from ZooKeeper Metadata @@ -133,9 +187,8 @@ func rebuild(cmd *cobra.Command, _ []string) { // Fetch broker metadata. var withMetrics bool - if cmd.Flag("placement").Value.String() == "storage" { - maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - if err := checkMetaAge(zk, maxMetadataAge); err != nil { + if params.placement == "storage" { + if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { fmt.Println(err) os.Exit(1) } @@ -144,7 +197,7 @@ func rebuild(cmd *cobra.Command, _ []string) { var brokerMeta kafkazk.BrokerMetaMap var errs []error - if m, _ := cmd.Flags().GetBool("use-meta"); m { + if params.useMetadata { if brokerMeta, errs = getBrokerMeta(zk, withMetrics); errs != nil && brokerMeta == nil { for _, e := range errs { fmt.Println(e) @@ -155,7 +208,7 @@ func rebuild(cmd *cobra.Command, _ []string) { // Fetch partition metadata. var partitionMeta kafkazk.PartitionMetaMap - if cmd.Flag("placement").Value.String() == "storage" { + if params.placement == "storage" { if partitionMeta, err = getPartitionMeta(zk); err != nil { fmt.Println(err) os.Exit(1) @@ -164,7 +217,7 @@ func rebuild(cmd *cobra.Command, _ []string) { // Build a partition map either from literal map text input or by fetching the // map data from ZooKeeper. Store a copy of the original. - partitionMapIn, pending, excluded := getPartitionMap(cmd, zk) + partitionMapIn, pending, excluded := getPartitionMap(params, zk) originalMap := partitionMapIn.Copy() // Get a list of affected topics. @@ -174,7 +227,7 @@ func rebuild(cmd *cobra.Command, _ []string) { // exclusion. printExcludedTopics(pending, excluded) - brokers, bs := getBrokers(cmd, partitionMapIn, brokerMeta) + brokers, bs := getBrokers(params, partitionMapIn, brokerMeta) brokersOrig := brokers.Copy() if bs.Changes() { @@ -183,7 +236,7 @@ func rebuild(cmd *cobra.Command, _ []string) { // Check if any referenced brokers are marked as having // missing/partial metrics data. - if m, _ := cmd.Flags().GetBool("use-meta"); m { + if params.useMetadata { if errs := ensureBrokerMetrics(brokers, brokerMeta); len(errs) > 0 { for _, e := range errs { fmt.Println(e) @@ -193,24 +246,24 @@ func rebuild(cmd *cobra.Command, _ []string) { } // Create substitution affinities. - affinities := getSubAffinities(cmd, brokers, brokersOrig, partitionMapIn) + affinities := getSubAffinities(params, brokers, brokersOrig, partitionMapIn) if affinities != nil { fmt.Printf("%s-\n", indent) } // Print changes, actions. - printChangesActions(cmd, bs) + printChangesActions(params, bs) // Apply any replication factor settings. - updateReplicationFactor(cmd, partitionMapIn) + updateReplicationFactor(params, partitionMapIn) // Build a new map using the provided list of brokers. This is OK to run even // when a no-op is intended. - partitionMapOut, errs := buildMap(cmd, partitionMapIn, partitionMeta, brokers, affinities) + partitionMapOut, errs := buildMap(params, partitionMapIn, partitionMeta, brokers, affinities) // Optimize leaders. - if t, _ := cmd.Flags().GetBool("optimize-leadership"); t { + if params.optimizeLeadership { partitionMapOut.OptimizeLeaderFollower() } @@ -228,11 +281,11 @@ func rebuild(cmd *cobra.Command, _ []string) { // Generate phased map if enabled. var phasedMap *kafkazk.PartitionMap - if phased, _ := cmd.Flags().GetBool("phased-reassignment"); phased { + if params.phasedReassignment { phasedMap = phasedReassignment(originalMap, partitionMapOut) } - partitionMapOut = evacuateLeadership(*partitionMapOut, evacBrokers, evacTopics) + partitionMapOut = evacuateLeadership(*partitionMapOut, params.leaderEvacBrokers, evacTopics) // Print map change results. printMapChanges(originalMap, partitionMapOut) @@ -244,7 +297,7 @@ func rebuild(cmd *cobra.Command, _ []string) { handleOverridableErrs(cmd, errs) // Skip no-ops if configured. - if sno, _ := cmd.Flags().GetBool("skip-no-ops"); sno { + if params.skipNoOps { originalMap, partitionMapOut = skipReassignmentNoOps(originalMap, partitionMapOut) } diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index bbc85ad..66f0099 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -5,8 +5,6 @@ import ( "os" "github.com/DataDog/kafka-kit/v3/kafkazk" - - "github.com/spf13/cobra" ) // *References to metrics metadata persisted in ZooKeeper, see: @@ -20,14 +18,13 @@ import ( // via the --topics flag. Two []string are returned; topics excluded due to // pending deletion and topics explicitly excluded (via the --topics-exclude // flag), respectively. -func getPartitionMap(cmd *cobra.Command, zk kafkazk.Handler) (*kafkazk.PartitionMap, []string, []string) { - ms := cmd.Flag("map-string").Value.String() +func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*kafkazk.PartitionMap, []string, []string) { switch { // The map was provided as text. - case ms != "": + case params.mapString != "": // Get a deserialized map. - pm, err := kafkazk.PartitionMapFromString(ms) + pm, err := kafkazk.PartitionMapFromString(params.mapString) if err != nil { fmt.Println(err) os.Exit(1) @@ -60,12 +57,10 @@ func getPartitionMap(cmd *cobra.Command, zk kafkazk.Handler) (*kafkazk.Partition // getSubAffinities, if enabled via --sub-affinity, takes reference broker maps // and a partition map and attempts to return a complete SubstitutionAffinities. -func getSubAffinities(cmd *cobra.Command, bm kafkazk.BrokerMap, bmo kafkazk.BrokerMap, pm *kafkazk.PartitionMap) kafkazk.SubstitutionAffinities { +func getSubAffinities(params rebuildParams, bm kafkazk.BrokerMap, bmo kafkazk.BrokerMap, pm *kafkazk.PartitionMap) kafkazk.SubstitutionAffinities { var affinities kafkazk.SubstitutionAffinities - sa, _ := cmd.Flags().GetBool("sub-affinity") - fr, _ := cmd.Flags().GetBool("force-rebuild") - if sa && !fr { + if params.subAffinity && !params.forceRebuild { var err error affinities, err = bm.SubstitutionAffinities(pm) if err != nil { @@ -97,13 +92,12 @@ func getSubAffinities(cmd *cobra.Command, bm kafkazk.BrokerMap, bmo kafkazk.Brok // brokers were discovered or newly provided (i.e. specified in the --brokers flag but // not previously holding any partitions for any partitions of the referenced topics // being rebuilt by topicmappr) -func getBrokers(cmd *cobra.Command, pm *kafkazk.PartitionMap, bm kafkazk.BrokerMetaMap) (kafkazk.BrokerMap, *kafkazk.BrokerStatus) { +func getBrokers(params rebuildParams, pm *kafkazk.PartitionMap, bm kafkazk.BrokerMetaMap) (kafkazk.BrokerMap, *kafkazk.BrokerStatus) { fmt.Printf("\nBroker change summary:\n") // Get a broker map of the brokers in the current partition map. // If meta data isn't being looked up, brokerMeta will be empty. - fr, _ := cmd.Flags().GetBool("force-rebuild") - brokers := kafkazk.BrokerMapFromPartitionMap(pm, bm, fr) + brokers := kafkazk.BrokerMapFromPartitionMap(pm, bm, params.forceRebuild) // Update the currentBrokers list with the provided broker list. bs, msgs := brokers.Update(Config.brokers, bm) @@ -116,11 +110,8 @@ func getBrokers(cmd *cobra.Command, pm *kafkazk.PartitionMap, bm kafkazk.BrokerM // printChangesActions takes a BrokerStatus and prints out information output // describing changes in broker counts and liveness. -func printChangesActions(cmd *cobra.Command, bs *kafkazk.BrokerStatus) { +func printChangesActions(params rebuildParams, bs *kafkazk.BrokerStatus) { change := bs.New - bs.Replace - r, _ := cmd.Flags().GetInt("replication") - fr, _ := cmd.Flags().GetBool("force-rebuild") - ol, _ := cmd.Flags().GetBool("optimize-leadership") // Print broker change summary. fmt.Printf("%sReplacing %d, added %d, missing %d, total count changed by %d\n", @@ -141,15 +132,15 @@ func printChangesActions(cmd *cobra.Command, bs *kafkazk.BrokerStatus) { actions <- fmt.Sprintf("Shrinking topic by %d broker(s)", -change) } - if fr { + if params.forceRebuild { actions <- fmt.Sprintf("Force rebuilding map") } - if r > 0 { - actions <- fmt.Sprintf("Setting replication factor to %d", r) + if params.replication > 0 { + actions <- fmt.Sprintf("Setting replication factor to %d", params.replication) } - if ol { + if params.optimizeLeadership { actions <- fmt.Sprintf("Optimizing leader/follower ratios") } @@ -170,33 +161,27 @@ func printChangesActions(cmd *cobra.Command, bs *kafkazk.BrokerStatus) { // updateReplicationFactor takes a PartitionMap and normalizes the replica set // length to an optionally provided value. -func updateReplicationFactor(cmd *cobra.Command, pm *kafkazk.PartitionMap) { - r, _ := cmd.Flags().GetInt("replication") +func updateReplicationFactor(params rebuildParams, pm *kafkazk.PartitionMap) { // If the replication factor is changed, the partition map input needs to have // stub brokers appended (r factor increase) or existing brokers removed // (r factor decrease). - if r > 0 { - pm.SetReplication(r) + if params.replication > 0 { + pm.SetReplication(params.replication) } } // buildMap takes an input PartitionMap, rebuild parameters, and all partition/broker // metadata structures required to generate the output PartitionMap. A []string of // warnings / advisories is returned if any are encountered. -func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.PartitionMetaMap, bm kafkazk.BrokerMap, af kafkazk.SubstitutionAffinities) (*kafkazk.PartitionMap, errors) { - placement := cmd.Flag("placement").Value.String() - psf, _ := cmd.Flags().GetFloat64("partition-size-factor") - mrrid, _ := cmd.Flags().GetInt("min-rack-ids") - +func buildMap(params rebuildParams, pm *kafkazk.PartitionMap, pmm kafkazk.PartitionMetaMap, bm kafkazk.BrokerMap, af kafkazk.SubstitutionAffinities) (*kafkazk.PartitionMap, errors) { rebuildParams := kafkazk.RebuildParams{ PMM: pmm, BM: bm, - Strategy: placement, - Optimization: cmd.Flag("optimize").Value.String(), - PartnSzFactor: psf, - MinUniqueRackIDs: mrrid, + Strategy: params.placement, + Optimization: params.optimize, + PartnSzFactor: params.partitionSizeFactor, + MinUniqueRackIDs: params.minRackIds, } - if af != nil { rebuildParams.Affinities = af } @@ -214,12 +199,12 @@ func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.Partitio // can be readded to the broker's StorageFree value. The amount to be readded, // the size of the partition, is referenced from the PartitionMetaMap. - if fr, _ := cmd.Flags().GetBool("force-rebuild"); fr { + if params.forceRebuild { // Get a stripped map that we'll call rebuild on. partitionMapInStripped := pm.Strip() // If the storage placement strategy is being used, // update the broker StorageFree values. - if placement == "storage" { + if params.placement == "storage" { err := rebuildParams.BM.SubStorage(pm, pmm, kafkazk.AllBrokersFn) if err != nil { fmt.Println(err) @@ -232,7 +217,7 @@ func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.Partitio } // Update the StorageFree only on brokers marked for replacement. - if placement == "storage" { + if params.placement == "storage" { err := rebuildParams.BM.SubStorage(pm, pmm, kafkazk.ReplacedBrokersFn) if err != nil { fmt.Println(err) From 5503077403824e6d1d79220528cc36c560883016 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Tue, 30 Nov 2021 22:06:07 +0100 Subject: [PATCH 13/18] [tm/rebalance] use params instead of cobra.cmd --- cmd/topicmappr/commands/rebalance.go | 89 +++++++++++++++++++--------- 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 4899cd1..c6c1372 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "os" + "regexp" "sort" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -41,8 +42,52 @@ func init() { rebalanceCmd.MarkFlagRequired("topics") } +type rebalanceParams struct { + brokers []int + localityScoped bool + maxMetadataAge int + optimizeLeadership bool + partitionLimit int + partitionSizeThreshold int + storageThreshold float64 + storageThresholdGB float64 + tolerance float64 + topics []*regexp.Regexp + topicsExclude []*regexp.Regexp + verbose bool +} + +func rebalanceParamsFromCmd(cmd *cobra.Command) (params rebalanceParams) { + brokers, _ := cmd.Flags().GetString("brokers") + params.brokers = brokerStringToSlice(brokers) + localityScoped, _ := cmd.Flags().GetBool("locality-scoped") + params.localityScoped = localityScoped + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + params.maxMetadataAge = maxMetadataAge + optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") + params.optimizeLeadership = optimizeLeadership + partitionLimit, _ := cmd.Flags().GetInt("partition-limit") + params.partitionLimit = partitionLimit + partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") + params.partitionSizeThreshold = partitionSizeThreshold + storageThreshold, _ := cmd.Flags().GetFloat64("storage-threshold") + params.storageThreshold = storageThreshold + storageThresholdGB, _ := cmd.Flags().GetFloat64("storage-threshold-gb") + params.storageThresholdGB = storageThresholdGB + tolerance, _ := cmd.Flags().GetFloat64("tolerance") + params.tolerance = tolerance + topics, _ := cmd.Flags().GetString("topics") + params.topics = topicRegex(topics) + topicsExclude, _ := cmd.Flags().GetString("topics-exclude") + params.topicsExclude = topicRegex(topicsExclude) + verbose, _ := cmd.Flags().GetBool("verbose") + params.verbose = verbose + return params +} + func rebalance(cmd *cobra.Command, _ []string) { bootstrap(cmd) + params := rebalanceParamsFromCmd(cmd) // ZooKeeper init. zkAddr := cmd.Parent().Flag("zk-addr").Value.String() @@ -57,8 +102,7 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() // Get broker and partition metadata. - maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - if err := checkMetaAge(zk, maxMetadataAge); err != nil { + if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { fmt.Println(err) os.Exit(1) } @@ -102,31 +146,25 @@ func rebalance(cmd *cobra.Command, _ []string) { // Validate all broker params, get a copy of the broker IDs targeted for // partition offloading. - offloadTargets := validateBrokersForRebalance(cmd, brokersIn, brokerMeta) + offloadTargets := validateBrokersForRebalance(params, brokersIn, brokerMeta) // Sort offloadTargets by storage free ascending. sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) - partitionLimit, _ := cmd.Flags().GetInt("partition-limit") - partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") - tolerance, _ := cmd.Flags().GetFloat64("tolerance") - localityScoped, _ := cmd.Flags().GetBool("locality-scoped") - verbose, _ := cmd.Flags().GetBool("verbose") - - params := computeReassignmentBundlesParams{ + reassignmentBundlesParams := computeReassignmentBundlesParams{ offloadTargets: offloadTargets, - tolerance: tolerance, + tolerance: params.tolerance, partitionMap: partitionMapIn, partitionMeta: partitionMeta, brokerMap: brokersIn, - partitionLimit: partitionLimit, - partitionSizeThreshold: partitionSizeThreshold, - localityScoped: localityScoped, - verbose: verbose, + partitionLimit: params.partitionLimit, + partitionSizeThreshold: params.partitionSizeThreshold, + localityScoped: params.localityScoped, + verbose: params.verbose, } // Generate reassignmentBundles for a rebalance. - results := computeReassignmentBundles(params) + results := computeReassignmentBundles(reassignmentBundlesParams) // Merge all results into a slice. resultsByRange := []reassignmentBundle{} @@ -154,7 +192,7 @@ func rebalance(cmd *cobra.Command, _ []string) { printReassignmentParams(cmd, resultsByRange, brokersIn, m.tolerance) // Optimize leaders. - if t, _ := cmd.Flags().GetBool("optimize-leadership"); t { + if params.optimizeLeadership { partitionMapOut.OptimizeLeaderFollower() } @@ -180,7 +218,7 @@ func rebalance(cmd *cobra.Command, _ []string) { writeMaps(outPath, outFile, partitionMapOut, nil) } -func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { +func validateBrokersForRebalance(params rebalanceParams, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { // No broker changes are permitted in rebalance other than new broker additions. fmt.Println("\nValidating broker list:") @@ -215,9 +253,6 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, fmt.Printf("%sOK\n", indent) } - st, _ := cmd.Flags().GetFloat64("storage-threshold") - stg, _ := cmd.Flags().GetFloat64("storage-threshold-gb") - var selectorMethod bytes.Buffer selectorMethod.WriteString("Brokers targeted for partition offloading ") @@ -226,12 +261,12 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, // Switch on the target selection method. If a storage threshold in gigabytes // is specified, prefer this. Otherwise, use the percentage below mean threshold. switch { - case stg > 0.00: - selectorMethod.WriteString(fmt.Sprintf("(< %.2fGB storage free)", stg)) + case params.storageThresholdGB > 0.00: + selectorMethod.WriteString(fmt.Sprintf("(< %.2fGB storage free)", params.storageThresholdGB)) // Get all non-new brokers with a StorageFree below the storage threshold in GB. f := func(b *kafkazk.Broker) bool { - if !b.New && b.StorageFree < stg*div { + if !b.New && b.StorageFree < params.storageThresholdGB*div { return true } return false @@ -244,11 +279,11 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, sort.Ints(offloadTargets) default: - selectorMethod.WriteString(fmt.Sprintf("(>= %.2f%% threshold below hmean)", st*100)) + selectorMethod.WriteString(fmt.Sprintf("(>= %.2f%% threshold below hmean)", params.storageThreshold*100)) // Find brokers where the storage free is t % below the harmonic mean. // Specifying 0 targets all non-new brokers. - switch st { + switch params.storageThreshold { case 0.00: f := func(b *kafkazk.Broker) bool { return !b.New } @@ -259,7 +294,7 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, sort.Ints(offloadTargets) default: - offloadTargets = brokers.BelowMean(st, brokers.HMean) + offloadTargets = brokers.BelowMean(params.storageThreshold, brokers.HMean) } } From 4a2edc737135603b47ac78d3e7879536a47337b0 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Tue, 30 Nov 2021 22:16:13 +0100 Subject: [PATCH 14/18] [tm/scale] use params instead of cobra.cmd --- cmd/topicmappr/commands/scale.go | 68 +++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 523a1c2..d500d58 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -3,6 +3,7 @@ package commands import ( "fmt" "os" + "regexp" "sort" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -38,8 +39,46 @@ func init() { scaleCmd.MarkFlagRequired("topics") } +type scaleParams struct { + brokers []int + localityScoped bool + maxMetadataAge int + optimizeLeadership bool + partitionLimit int + partitionSizeThreshold int + tolerance float64 + topics []*regexp.Regexp + topicsExclude []*regexp.Regexp + verbose bool +} + +func scaleParamsFromCmd(cmd *cobra.Command) (params scaleParams) { + brokers, _ := cmd.Flags().GetString("brokers") + params.brokers = brokerStringToSlice(brokers) + localityScoped, _ := cmd.Flags().GetBool("locality-scoped") + params.localityScoped = localityScoped + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + params.maxMetadataAge = maxMetadataAge + optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") + params.optimizeLeadership = optimizeLeadership + partitionLimit, _ := cmd.Flags().GetInt("partition-limit") + params.partitionLimit = partitionLimit + partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") + params.partitionSizeThreshold = partitionSizeThreshold + tolerance, _ := cmd.Flags().GetFloat64("tolerance") + params.tolerance = tolerance + topics, _ := cmd.Flags().GetString("topics") + params.topics = topicRegex(topics) + topicsExclude, _ := cmd.Flags().GetString("topics-exclude") + params.topicsExclude = topicRegex(topicsExclude) + verbose, _ := cmd.Flags().GetBool("verbose") + params.verbose = verbose + return params +} + func scale(cmd *cobra.Command, _ []string) { bootstrap(cmd) + params := scaleParamsFromCmd(cmd) // ZooKeeper init. zkAddr := cmd.Parent().Flag("zk-addr").Value.String() @@ -54,8 +93,7 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() // Get broker and partition metadata. - maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - if err := checkMetaAge(zk, maxMetadataAge); err != nil { + if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { fmt.Println(err) os.Exit(1) } @@ -99,31 +137,25 @@ func scale(cmd *cobra.Command, _ []string) { // Validate all broker params, get a copy of the // broker IDs targeted for partition offloading. - offloadTargets := validateBrokersForScale(cmd, brokersIn, brokerMeta) + offloadTargets := validateBrokersForScale(brokersIn, brokerMeta) // Sort offloadTargets by storage free ascending. sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) - partitionLimit, _ := cmd.Flags().GetInt("partition-limit") - partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") - tolerance, _ := cmd.Flags().GetFloat64("tolerance") - localityScoped, _ := cmd.Flags().GetBool("locality-scoped") - verbose, _ := cmd.Flags().GetBool("verbose") - // Generate reassignmentBundles for a scale up. - params := computeReassignmentBundlesParams{ + reassignmentBundlesParams := computeReassignmentBundlesParams{ offloadTargets: offloadTargets, - tolerance: tolerance, + tolerance: params.tolerance, partitionMap: partitionMapIn, partitionMeta: partitionMeta, brokerMap: brokersIn, - partitionLimit: partitionLimit, - partitionSizeThreshold: partitionSizeThreshold, - localityScoped: localityScoped, - verbose: verbose, + partitionLimit: params.partitionLimit, + partitionSizeThreshold: params.partitionSizeThreshold, + localityScoped: params.localityScoped, + verbose: params.verbose, } - results := computeReassignmentBundles(params) + results := computeReassignmentBundles(reassignmentBundlesParams) // Merge all results into a slice. resultsByRange := []reassignmentBundle{} @@ -151,7 +183,7 @@ func scale(cmd *cobra.Command, _ []string) { printReassignmentParams(cmd, resultsByRange, brokersIn, m.tolerance) // Optimize leaders. - if t, _ := cmd.Flags().GetBool("optimize-leadership"); t { + if params.optimizeLeadership { partitionMapOut.OptimizeLeaderFollower() } @@ -179,7 +211,7 @@ func scale(cmd *cobra.Command, _ []string) { writeMaps(outPath, outFile, partitionMapOut, nil) } -func validateBrokersForScale(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { +func validateBrokersForScale(brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { // No broker changes are permitted in rebalance other than new broker additions. fmt.Println("\nValidating broker list:") From b9a2f9bd639d1ef9724d9d5961ab4ce47ca2b089 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Wed, 1 Dec 2021 10:28:19 +0100 Subject: [PATCH 15/18] [tm] remove global config object --- cmd/topicmappr/commands/config.go | 21 +-------------------- cmd/topicmappr/commands/rebalance.go | 8 ++++---- cmd/topicmappr/commands/rebuild.go | 3 ++- cmd/topicmappr/commands/rebuild_steps.go | 10 +++++----- cmd/topicmappr/commands/scale.go | 12 ++++++------ 5 files changed, 18 insertions(+), 36 deletions(-) diff --git a/cmd/topicmappr/commands/config.go b/cmd/topicmappr/commands/config.go index e61b03b..39d0c0e 100644 --- a/cmd/topicmappr/commands/config.go +++ b/cmd/topicmappr/commands/config.go @@ -23,33 +23,14 @@ const ( var ( // Characters allowed in Kafka topic names topicNormalChar = regexp.MustCompile(`[a-zA-Z0-9_\\-]`) - - // Config holds global configs. - Config struct { - topics []*regexp.Regexp - topicsExclude []*regexp.Regexp - brokers []int - } ) -func bootstrap(cmd *cobra.Command) { - b, _ := cmd.Flags().GetString("brokers") - Config.brokers = brokerStringToSlice(b) - +func sanitizeInput(cmd *cobra.Command) { // Append trailing slash if not included. op := cmd.Flag("out-path").Value.String() if op != "" && !strings.HasSuffix(op, "/") { cmd.Flags().Set("out-path", op+"/") } - - // Populate topic include / exclude regex. - if include, _ := cmd.Flags().GetString("topics"); include != "" { - Config.topics = topicRegex(include) - } - - if exclude, _ := cmd.Flags().GetString("topics-exclude"); exclude != "" { - Config.topicsExclude = topicRegex(exclude) - } } // topicRegex takes a string of csv values and returns a []*regexp.Regexp. diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index c6c1372..2607c89 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -86,7 +86,7 @@ func rebalanceParamsFromCmd(cmd *cobra.Command) (params rebalanceParams) { } func rebalance(cmd *cobra.Command, _ []string) { - bootstrap(cmd) + sanitizeInput(cmd) params := rebalanceParamsFromCmd(cmd) // ZooKeeper init. @@ -120,7 +120,7 @@ func rebalance(cmd *cobra.Command, _ []string) { } // Get the current partition map. - partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) + partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk) if err != nil { fmt.Println(err) os.Exit(1) @@ -133,7 +133,7 @@ func rebalance(cmd *cobra.Command, _ []string) { } // Exclude any explicit exclusions. - excluded := removeTopics(partitionMapIn, Config.topicsExclude) + excluded := removeTopics(partitionMapIn, params.topicsExclude) // Print topics matched to input params. printTopics(partitionMapIn) @@ -224,7 +224,7 @@ func validateBrokersForRebalance(params rebalanceParams, brokers kafkazk.BrokerM // Update the current BrokerList with // the provided broker list. - c, msgs := brokers.Update(Config.brokers, bm) + c, msgs := brokers.Update(params.brokers, bm) for m := range msgs { fmt.Printf("%s%s\n", indent, m) } diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 3fc8d4b..15c5e27 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -136,8 +136,9 @@ func (c rebuildParams) validate() error { } func rebuild(cmd *cobra.Command, _ []string) { - bootstrap(cmd) + sanitizeInput(cmd) params := rebuildParamsFromCmd(cmd) + err := params.validate() if err != nil { fmt.Println(err) diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index 66f0099..744c9c9 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -30,11 +30,11 @@ func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*kafkazk.Partiti os.Exit(1) } // Exclude topics explicitly listed. - et := removeTopics(pm, Config.topicsExclude) + et := removeTopics(pm, params.topicsExclude) return pm, []string{}, et // The map needs to be fetched via ZooKeeper metadata for all specified topics. - case len(Config.topics) > 0: - pm, err := kafkazk.PartitionMapFromZK(Config.topics, zk) + case len(params.topics) > 0: + pm, err := kafkazk.PartitionMapFromZK(params.topics, zk) if err != nil { fmt.Println(err) os.Exit(1) @@ -47,7 +47,7 @@ func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*kafkazk.Partiti } // Exclude topics explicitly listed. - et := removeTopics(pm, Config.topicsExclude) + et := removeTopics(pm, params.topicsExclude) return pm, pd, et } @@ -100,7 +100,7 @@ func getBrokers(params rebuildParams, pm *kafkazk.PartitionMap, bm kafkazk.Broke brokers := kafkazk.BrokerMapFromPartitionMap(pm, bm, params.forceRebuild) // Update the currentBrokers list with the provided broker list. - bs, msgs := brokers.Update(Config.brokers, bm) + bs, msgs := brokers.Update(params.brokers, bm) for m := range msgs { fmt.Printf("%s%s\n", indent, m) } diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index d500d58..3d597f5 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -77,7 +77,7 @@ func scaleParamsFromCmd(cmd *cobra.Command) (params scaleParams) { } func scale(cmd *cobra.Command, _ []string) { - bootstrap(cmd) + sanitizeInput(cmd) params := scaleParamsFromCmd(cmd) // ZooKeeper init. @@ -111,7 +111,7 @@ func scale(cmd *cobra.Command, _ []string) { } // Get the current partition map. - partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) + partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk) if err != nil { fmt.Println(err) os.Exit(1) @@ -124,7 +124,7 @@ func scale(cmd *cobra.Command, _ []string) { } // Exclude any explicit exclusions. - excluded := removeTopics(partitionMapIn, Config.topicsExclude) + excluded := removeTopics(partitionMapIn, params.topicsExclude) // Print topics matched to input params. printTopics(partitionMapIn) @@ -137,7 +137,7 @@ func scale(cmd *cobra.Command, _ []string) { // Validate all broker params, get a copy of the // broker IDs targeted for partition offloading. - offloadTargets := validateBrokersForScale(brokersIn, brokerMeta) + offloadTargets := validateBrokersForScale(params, brokersIn, brokerMeta) // Sort offloadTargets by storage free ascending. sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) @@ -211,13 +211,13 @@ func scale(cmd *cobra.Command, _ []string) { writeMaps(outPath, outFile, partitionMapOut, nil) } -func validateBrokersForScale(brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { +func validateBrokersForScale(params scaleParams, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { // No broker changes are permitted in rebalance other than new broker additions. fmt.Println("\nValidating broker list:") // Update the current BrokerList with // the provided broker list. - c, msgs := brokers.Update(Config.brokers, bm) + c, msgs := brokers.Update(params.brokers, bm) for m := range msgs { fmt.Printf("%s%s\n", indent, m) } From 7927dbd561b36a76405cd1188e7d6b9374bf6907 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Wed, 1 Dec 2021 09:00:32 +0100 Subject: [PATCH 16/18] [tm] unify rebalance and scale --- cmd/topicmappr/commands/output.go | 35 +-- cmd/topicmappr/commands/reassignments.go | 276 +++++++++++++++++++++-- cmd/topicmappr/commands/rebalance.go | 255 +-------------------- cmd/topicmappr/commands/rebuild.go | 5 +- cmd/topicmappr/commands/scale.go | 225 +----------------- 5 files changed, 281 insertions(+), 515 deletions(-) diff --git a/cmd/topicmappr/commands/output.go b/cmd/topicmappr/commands/output.go index 5a233c5..a42591d 100644 --- a/cmd/topicmappr/commands/output.go +++ b/cmd/topicmappr/commands/output.go @@ -6,7 +6,6 @@ import ( "math" "os" "sort" - "strings" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -80,7 +79,7 @@ func printMapChanges(pm1, pm2 *kafkazk.PartitionMap) { // printBrokerAssignmentStats prints before and after broker usage stats, // such as leadership counts, total partitions owned, degree distribution, // and changes in storage usage. -func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionMap, bm1, bm2 kafkazk.BrokerMap) errors { +func printBrokerAssignmentStats(pm1, pm2 *kafkazk.PartitionMap, bm1, bm2 kafkazk.BrokerMap, storageBased bool, partitionSizeFactor float64) errors { var errs errors fmt.Println("\nBroker distribution:") @@ -100,17 +99,10 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM } // If we're using the storage placement strategy, write anticipated storage changes. - psf, _ := cmd.Flags().GetFloat64("partition-size-factor") - - switch { - case - cmd.Name() == "scale", - cmd.Name() == "rebalance", - cmd.Flag("placement").Value.String() == "storage": - + if storageBased { fmt.Println("\nStorage free change estimations:") - if psf != 1.0 && cmd.Name() != "rebalance" { - fmt.Printf("%sPartition size factor of %.2f applied\n", indent, psf) + if partitionSizeFactor != 1.0 { + fmt.Printf("%sPartition size factor of %.2f applied\n", indent, partitionSizeFactor) } // Get filtered BrokerMaps. For the 'before' broker statistics, we want @@ -130,9 +122,7 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM fmt.Printf("%srange: %.2fGB -> %.2fGB\n", indent, r1/div, r2/div) if r2 > r1 { // Range increases are acceptable and common in scale up operations. - if cmd.Name() != "scale" { - errs = append(errs, fmt.Errorf("broker free storage range increased")) - } + errs = append(errs, fmt.Errorf("broker free storage range increased")) } // Range spread before/after. @@ -265,15 +255,12 @@ func writeMaps(outPath, outFile string, pm *kafkazk.PartitionMap, phasedPM *kafk } } -func printReassignmentParams(cmd *cobra.Command, results []reassignmentBundle, brokers kafkazk.BrokerMap, tol float64) { - subCmd := cmd.Name() - - fmt.Printf("\n%s parameters:\n", strings.Title(subCmd)) +func printReassignmentParams(params reassignParams, results []reassignmentBundle, brokers kafkazk.BrokerMap, tol float64) { + fmt.Printf("\nReassignment parameters:\n") - pst, _ := cmd.Flags().GetInt("partition-size-threshold") mean, hMean := brokers.Mean(), brokers.HMean() - fmt.Printf("%sIgnoring partitions smaller than %dMB\n", indent, pst) + fmt.Printf("%sIgnoring partitions smaller than %dMB\n", indent, params.partitionSizeThreshold) fmt.Printf("%sFree storage mean, harmonic mean: %.2fGB, %.2fGB\n", indent, mean/div, hMean/div) @@ -283,11 +270,9 @@ func printReassignmentParams(cmd *cobra.Command, results []reassignmentBundle, b fmt.Printf("%s%sSources limited to <= %.2fGB\n", indent, indent, mean*(1+tol)/div) fmt.Printf("%s%sDestinations limited to >= %.2fGB\n", indent, indent, mean*(1-tol)/div) - verbose, _ := cmd.Flags().GetBool("verbose") - // Print the top 10 rebalance results in verbose. - if verbose { - fmt.Printf("%s-\nTop 10 %s map results\n", indent, subCmd) + if params.verbose { + fmt.Printf("%s-\nTop 10 reassignment map results\n", indent) for i, r := range results { fmt.Printf("%stolerance: %.2f -> range: %.2fGB, std. deviation: %.2fGB\n", indent, r.tolerance, r.storageRange/div, r.stdDev/div) diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index d0d9ef5..fd0af34 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -1,9 +1,15 @@ package commands import ( + "bytes" + "fmt" + "os" + "regexp" + "sort" "sync" "github.com/DataDog/kafka-kit/v3/kafkazk" + "github.com/spf13/cobra" ) // reassignmentBundle holds a reassignment PartitionMap along with some input @@ -24,26 +30,180 @@ type reassignmentBundle struct { brokers kafkazk.BrokerMap } -type computeReassignmentBundlesParams struct { - offloadTargets []int - tolerance float64 - partitionMap *kafkazk.PartitionMap - partitionMeta kafkazk.PartitionMetaMap - brokerMap kafkazk.BrokerMap +type reassignParams struct { + brokers []int + localityScoped bool + maxMetadataAge int + optimizeLeadership bool partitionLimit int partitionSizeThreshold int - localityScoped bool + storageThreshold float64 + storageThresholdGB float64 + tolerance float64 + topics []*regexp.Regexp + topicsExclude []*regexp.Regexp + requireNewBrokers bool verbose bool } +func (s reassignParams) UseFixedTolerance() bool { return s.tolerance != 0.00 } + +func reassignParamsFromCmd(cmd *cobra.Command) (params reassignParams) { + brokers, _ := cmd.Flags().GetString("brokers") + params.brokers = brokerStringToSlice(brokers) + localityScoped, _ := cmd.Flags().GetBool("locality-scoped") + params.localityScoped = localityScoped + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + params.maxMetadataAge = maxMetadataAge + optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") + params.optimizeLeadership = optimizeLeadership + partitionLimit, _ := cmd.Flags().GetInt("partition-limit") + params.partitionLimit = partitionLimit + partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") + params.partitionSizeThreshold = partitionSizeThreshold + storageThreshold, _ := cmd.Flags().GetFloat64("storage-threshold") + params.storageThreshold = storageThreshold + storageThresholdGB, _ := cmd.Flags().GetFloat64("storage-threshold-gb") + params.storageThresholdGB = storageThresholdGB + tolerance, _ := cmd.Flags().GetFloat64("tolerance") + params.tolerance = tolerance + topics, _ := cmd.Flags().GetString("topics") + params.topics = topicRegex(topics) + topicsExclude, _ := cmd.Flags().GetString("topics-exclude") + params.topicsExclude = topicRegex(topicsExclude) + verbose, _ := cmd.Flags().GetBool("verbose") + params.verbose = verbose + return params +} + +func reassign(params reassignParams, zk kafkazk.Handler) (*kafkazk.PartitionMap, []error) { + // Get broker and partition metadata. + if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { + fmt.Println(err) + os.Exit(1) + } + brokerMeta, errs := getBrokerMeta(zk, true) + if errs != nil && brokerMeta == nil { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + partitionMeta, err := getPartitionMeta(zk) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + // Get the current partition map. + partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + // Exclude any topics that are pending deletion. + pending, err := stripPendingDeletes(partitionMapIn, zk) + if err != nil { + fmt.Println("Error fetching topics pending deletion") + } + + // Exclude any explicit exclusions. + excluded := removeTopics(partitionMapIn, params.topicsExclude) + + // Print topics matched to input params. + printTopics(partitionMapIn) + + // Print if any topics were excluded due to pending deletion. + printExcludedTopics(pending, excluded) + + // Get a broker map. + brokersIn := kafkazk.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false) + + // Validate all broker params, get a copy of the broker IDs targeted for + // partition offloading. + if errs := validateBrokers(params.brokers, brokersIn, brokerMeta, params.requireNewBrokers); len(errs) > 0 { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + + // Get offload targets. + offloadTargets := determineOffloadTargets(params, brokersIn) + + // Sort offloadTargets by storage free ascending. + sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) + + // Generate reassignmentBundles for a rebalance. + results := computeReassignmentBundles( + params, + partitionMapIn, + partitionMeta, + brokersIn, + offloadTargets, + ) + + // Merge all results into a slice. + resultsByRange := []reassignmentBundle{} + for r := range results { + resultsByRange = append(resultsByRange, r) + } + + // Sort the rebalance results by range ascending. + sort.Slice(resultsByRange, func(i, j int) bool { + switch { + case resultsByRange[i].storageRange < resultsByRange[j].storageRange: + return true + case resultsByRange[i].storageRange > resultsByRange[j].storageRange: + return false + } + + return resultsByRange[i].stdDev < resultsByRange[j].stdDev + }) + + // Chose the results with the lowest range. + m := resultsByRange[0] + partitionMapOut, brokersOut, relos := m.partitionMap, m.brokers, m.relocations + + // Print parameters used for rebalance decisions. + printReassignmentParams(params, resultsByRange, brokersIn, m.tolerance) + + // Optimize leaders. + if params.optimizeLeadership { + partitionMapOut.OptimizeLeaderFollower() + } + + // Print planned relocations. + printPlannedRelocations(offloadTargets, relos, partitionMeta) + + // Print map change results. + printMapChanges(partitionMapIn, partitionMapOut) + + // Print broker assignment statistics. + errs = printBrokerAssignmentStats(partitionMapIn, partitionMapOut, brokersIn, brokersOut, true, 1.0) + + // Ignore no-ops; rebalances will naturally have a high percentage of these. + partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) + + return partitionMapOut, errs + +} + // computeReassignmentBundles takes computeReassignmentBundlesParams and returns // a chan reassignmentBundle. The channel will either contain a single reassignmentBundle // if a fixed computeReassignmentBundlesParams.tolerance value (non 0.00) is // specified, otherwise it will contain a series of reassignmentBundle for multiple // interval values. When generating a series, the results are computed in parallel. -func computeReassignmentBundles(params computeReassignmentBundlesParams) chan reassignmentBundle { +func computeReassignmentBundles( + params reassignParams, + partitionMap *kafkazk.PartitionMap, + partitionMeta kafkazk.PartitionMetaMap, + brokerMap kafkazk.BrokerMap, + offloadTargets []int, +) chan reassignmentBundle { otm := map[int]struct{}{} - for _, id := range params.offloadTargets { + for _, id := range offloadTargets { otm[id] = struct{}{} } @@ -55,13 +215,11 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re // Whether we're using a fixed tolerance (non 0.00 value) set via flag or // interval values. var tol float64 - var fixedTolerance bool - if params.tolerance == 0.00 { - tol = i - } else { + if params.UseFixedTolerance() { tol = params.tolerance - fixedTolerance = true + } else { + tol = i } wg.Add(1) @@ -69,14 +227,14 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re go func() { defer wg.Done() - partitionMap := params.partitionMap.Copy() + partitionMap := partitionMap.Copy() // Bundle planRelocationsForBrokerParams. relocationParams := planRelocationsForBrokerParams{ relos: map[int][]relocation{}, mappings: partitionMap.Mappings(), - brokers: params.brokerMap.Copy(), - partitionMeta: params.partitionMeta, + brokers: brokerMap.Copy(), + partitionMeta: partitionMeta, plan: relocationPlan{}, topPartitionsLimit: params.partitionLimit, partitionSizeThreshold: params.partitionSizeThreshold, @@ -88,9 +246,9 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re // Iterate over offload targets, planning at most one relocation per iteration. // Continue this loop until no more relocations can be planned. - for exhaustedCount := 0; exhaustedCount < len(params.offloadTargets); { + for exhaustedCount := 0; exhaustedCount < len(offloadTargets); { relocationParams.pass++ - for _, sourceID := range params.offloadTargets { + for _, sourceID := range offloadTargets { // Update the source broker ID relocationParams.sourceID = sourceID @@ -119,7 +277,7 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re }() // Break early if we're using a fixed tolerance value. - if fixedTolerance { + if params.UseFixedTolerance() { break } } @@ -129,3 +287,81 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re return results } + +func validateBrokers( + newBrokers []int, + currentBrokers kafkazk.BrokerMap, + bm kafkazk.BrokerMetaMap, + newBrokersRequired bool, +) []error { + // No broker changes are permitted in rebalance other than new broker additions. + fmt.Println("\nValidating broker list:") + + // Update the current BrokerList with + // the provided broker list. + c, msgs := currentBrokers.Update(newBrokers, bm) + for m := range msgs { + fmt.Printf("%s%s\n", indent, m) + } + + if c.Changes() { + fmt.Printf("%s-\n", indent) + } + + // Check if any referenced brokers are marked as having missing/partial metrics data. + if errs := ensureBrokerMetrics(currentBrokers, bm); len(errs) > 0 { + return errs + } + + switch { + case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: + return []error{fmt.Errorf("reassignment only allows broker additions")} + case c.New > 0: + fmt.Printf("%s%d additional brokers added\n", indent, c.New) + fmt.Printf("%s-\n", indent) + fmt.Printf("%sOK\n", indent) + case newBrokersRequired: + return []error{fmt.Errorf("reassignment requires additional brokers\n")} + + } + return nil +} + +func determineOffloadTargets(params reassignParams, brokers kafkazk.BrokerMap) []int { + var offloadTargets []int + + var selectorMethod bytes.Buffer + selectorMethod.WriteString("Brokers targeted for partition offloading ") + + // Switch on the target selection method. If a storage threshold in gigabytes + // is specified, prefer this. Otherwise, use the percentage below mean threshold. + var f kafkazk.BrokerFilterFn + if params.storageThresholdGB > 0.00 { + selectorMethod.WriteString(fmt.Sprintf("(< %.2fGB storage free)", params.storageThresholdGB)) + + // Get all non-new brokers with a StorageFree below the storage threshold in GB. + f = func(b *kafkazk.Broker) bool { + if !b.New && b.StorageFree < params.storageThresholdGB*div { + return true + } + return false + } + + } else if params.storageThreshold > 0.00 { + // Find brokers where the storage free is t % below the harmonic mean. + selectorMethod.WriteString(fmt.Sprintf("(>= %.2f%% threshold below hmean)", params.storageThreshold*100)) + f = kafkazk.BelowMeanFn(params.storageThreshold, brokers.HMean) + } else { + // Specifying 0 targets all non-new brokers, this is a scale up + f = func(b *kafkazk.Broker) bool { return !b.New } + } + + matches := brokers.Filter(f) + for _, b := range matches { + offloadTargets = append(offloadTargets, b.ID) + } + + fmt.Printf("\n%s:\n", selectorMethod.String()) + + return offloadTargets +} diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 2607c89..0d47980 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -1,13 +1,8 @@ package commands import ( - "bytes" "fmt" "os" - "regexp" - "sort" - - "github.com/DataDog/kafka-kit/v3/kafkazk" "github.com/spf13/cobra" ) @@ -42,52 +37,10 @@ func init() { rebalanceCmd.MarkFlagRequired("topics") } -type rebalanceParams struct { - brokers []int - localityScoped bool - maxMetadataAge int - optimizeLeadership bool - partitionLimit int - partitionSizeThreshold int - storageThreshold float64 - storageThresholdGB float64 - tolerance float64 - topics []*regexp.Regexp - topicsExclude []*regexp.Regexp - verbose bool -} - -func rebalanceParamsFromCmd(cmd *cobra.Command) (params rebalanceParams) { - brokers, _ := cmd.Flags().GetString("brokers") - params.brokers = brokerStringToSlice(brokers) - localityScoped, _ := cmd.Flags().GetBool("locality-scoped") - params.localityScoped = localityScoped - maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - params.maxMetadataAge = maxMetadataAge - optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") - params.optimizeLeadership = optimizeLeadership - partitionLimit, _ := cmd.Flags().GetInt("partition-limit") - params.partitionLimit = partitionLimit - partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") - params.partitionSizeThreshold = partitionSizeThreshold - storageThreshold, _ := cmd.Flags().GetFloat64("storage-threshold") - params.storageThreshold = storageThreshold - storageThresholdGB, _ := cmd.Flags().GetFloat64("storage-threshold-gb") - params.storageThresholdGB = storageThresholdGB - tolerance, _ := cmd.Flags().GetFloat64("tolerance") - params.tolerance = tolerance - topics, _ := cmd.Flags().GetString("topics") - params.topics = topicRegex(topics) - topicsExclude, _ := cmd.Flags().GetString("topics-exclude") - params.topicsExclude = topicRegex(topicsExclude) - verbose, _ := cmd.Flags().GetBool("verbose") - params.verbose = verbose - return params -} - func rebalance(cmd *cobra.Command, _ []string) { sanitizeInput(cmd) - params := rebalanceParamsFromCmd(cmd) + params := reassignParamsFromCmd(cmd) + params.requireNewBrokers = false // ZooKeeper init. zkAddr := cmd.Parent().Flag("zk-addr").Value.String() @@ -101,214 +54,14 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() - // Get broker and partition metadata. - if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { - fmt.Println(err) - os.Exit(1) - } - brokerMeta, errs := getBrokerMeta(zk, true) - if errs != nil && brokerMeta == nil { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - partitionMeta, err := getPartitionMeta(zk) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Get the current partition map. - partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Exclude any topics that are pending deletion. - pending, err := stripPendingDeletes(partitionMapIn, zk) - if err != nil { - fmt.Println("Error fetching topics pending deletion") - } - - // Exclude any explicit exclusions. - excluded := removeTopics(partitionMapIn, params.topicsExclude) - - // Print topics matched to input params. - printTopics(partitionMapIn) - - // Print if any topics were excluded due to pending deletion. - printExcludedTopics(pending, excluded) - - // Get a broker map. - brokersIn := kafkazk.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false) - - // Validate all broker params, get a copy of the broker IDs targeted for - // partition offloading. - offloadTargets := validateBrokersForRebalance(params, brokersIn, brokerMeta) - - // Sort offloadTargets by storage free ascending. - sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) - - reassignmentBundlesParams := computeReassignmentBundlesParams{ - offloadTargets: offloadTargets, - tolerance: params.tolerance, - partitionMap: partitionMapIn, - partitionMeta: partitionMeta, - brokerMap: brokersIn, - partitionLimit: params.partitionLimit, - partitionSizeThreshold: params.partitionSizeThreshold, - localityScoped: params.localityScoped, - verbose: params.verbose, - } - - // Generate reassignmentBundles for a rebalance. - results := computeReassignmentBundles(reassignmentBundlesParams) - - // Merge all results into a slice. - resultsByRange := []reassignmentBundle{} - for r := range results { - resultsByRange = append(resultsByRange, r) - } - - // Sort the rebalance results by range ascending. - sort.Slice(resultsByRange, func(i, j int) bool { - switch { - case resultsByRange[i].storageRange < resultsByRange[j].storageRange: - return true - case resultsByRange[i].storageRange > resultsByRange[j].storageRange: - return false - } - - return resultsByRange[i].stdDev < resultsByRange[j].stdDev - }) - - // Chose the results with the lowest range. - m := resultsByRange[0] - partitionMapOut, brokersOut, relos := m.partitionMap, m.brokers, m.relocations - - // Print parameters used for rebalance decisions. - printReassignmentParams(cmd, resultsByRange, brokersIn, m.tolerance) - - // Optimize leaders. - if params.optimizeLeadership { - partitionMapOut.OptimizeLeaderFollower() - } - - // Print planned relocations. - printPlannedRelocations(offloadTargets, relos, partitionMeta) - - // Print map change results. - printMapChanges(partitionMapIn, partitionMapOut) - - // Print broker assignment statistics. - errs = printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) + partitionMap, errs := reassign(params, zk) // Handle errors that are possible to be overridden by the user (aka 'WARN' // in topicmappr console output). handleOverridableErrs(cmd, errs) - // Ignore no-ops; rebalances will naturally have a high percentage of these. - partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) - // Write maps. outPath := cmd.Flag("out-path").Value.String() outFile := cmd.Flag("out-file").Value.String() - writeMaps(outPath, outFile, partitionMapOut, nil) -} - -func validateBrokersForRebalance(params rebalanceParams, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { - // No broker changes are permitted in rebalance other than new broker additions. - fmt.Println("\nValidating broker list:") - - // Update the current BrokerList with - // the provided broker list. - c, msgs := brokers.Update(params.brokers, bm) - for m := range msgs { - fmt.Printf("%s%s\n", indent, m) - } - - if c.Changes() { - fmt.Printf("%s-\n", indent) - } - - // Check if any referenced brokers are marked as having missing/partial metrics data. - if errs := ensureBrokerMetrics(brokers, bm); len(errs) > 0 { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - - switch { - case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: - fmt.Printf("%s[ERROR] rebalance only allows broker additions\n", indent) - os.Exit(1) - case c.New > 0: - fmt.Printf("%s%d additional brokers added\n", indent, c.New) - fmt.Printf("%s-\n", indent) - fallthrough - default: - fmt.Printf("%sOK\n", indent) - } - - var selectorMethod bytes.Buffer - selectorMethod.WriteString("Brokers targeted for partition offloading ") - - var offloadTargets []int - - // Switch on the target selection method. If a storage threshold in gigabytes - // is specified, prefer this. Otherwise, use the percentage below mean threshold. - switch { - case params.storageThresholdGB > 0.00: - selectorMethod.WriteString(fmt.Sprintf("(< %.2fGB storage free)", params.storageThresholdGB)) - - // Get all non-new brokers with a StorageFree below the storage threshold in GB. - f := func(b *kafkazk.Broker) bool { - if !b.New && b.StorageFree < params.storageThresholdGB*div { - return true - } - return false - } - - matches := brokers.Filter(f) - for _, b := range matches { - offloadTargets = append(offloadTargets, b.ID) - } - - sort.Ints(offloadTargets) - default: - selectorMethod.WriteString(fmt.Sprintf("(>= %.2f%% threshold below hmean)", params.storageThreshold*100)) - - // Find brokers where the storage free is t % below the harmonic mean. - // Specifying 0 targets all non-new brokers. - switch params.storageThreshold { - case 0.00: - f := func(b *kafkazk.Broker) bool { return !b.New } - - matches := brokers.Filter(f) - for _, b := range matches { - offloadTargets = append(offloadTargets, b.ID) - } - - sort.Ints(offloadTargets) - default: - offloadTargets = brokers.BelowMean(params.storageThreshold, brokers.HMean) - } - } - - fmt.Printf("\n%s:\n", selectorMethod.String()) - - // Exit if no target brokers were found. - if len(offloadTargets) == 0 { - fmt.Printf("%s[none]\n", indent) - os.Exit(0) - } else { - for _, id := range offloadTargets { - fmt.Printf("%s%d\n", indent, id) - } - } - - return offloadTargets + writeMaps(outPath, outFile, partitionMap, nil) } diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 15c5e27..f8c70e5 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -292,7 +292,10 @@ func rebuild(cmd *cobra.Command, _ []string) { printMapChanges(originalMap, partitionMapOut) // Print broker assignment statistics. - printBrokerAssignmentStats(cmd, originalMap, partitionMapOut, brokersOrig, brokers) + errs = append( + errs, + printBrokerAssignmentStats(originalMap, partitionMapOut, brokersOrig, brokers, params.placement == "storage", params.partitionSizeFactor)..., + ) // Print error/warnings. handleOverridableErrs(cmd, errs) diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 3d597f5..d595f77 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -3,10 +3,6 @@ package commands import ( "fmt" "os" - "regexp" - "sort" - - "github.com/DataDog/kafka-kit/v3/kafkazk" "github.com/spf13/cobra" ) @@ -39,46 +35,10 @@ func init() { scaleCmd.MarkFlagRequired("topics") } -type scaleParams struct { - brokers []int - localityScoped bool - maxMetadataAge int - optimizeLeadership bool - partitionLimit int - partitionSizeThreshold int - tolerance float64 - topics []*regexp.Regexp - topicsExclude []*regexp.Regexp - verbose bool -} - -func scaleParamsFromCmd(cmd *cobra.Command) (params scaleParams) { - brokers, _ := cmd.Flags().GetString("brokers") - params.brokers = brokerStringToSlice(brokers) - localityScoped, _ := cmd.Flags().GetBool("locality-scoped") - params.localityScoped = localityScoped - maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - params.maxMetadataAge = maxMetadataAge - optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") - params.optimizeLeadership = optimizeLeadership - partitionLimit, _ := cmd.Flags().GetInt("partition-limit") - params.partitionLimit = partitionLimit - partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") - params.partitionSizeThreshold = partitionSizeThreshold - tolerance, _ := cmd.Flags().GetFloat64("tolerance") - params.tolerance = tolerance - topics, _ := cmd.Flags().GetString("topics") - params.topics = topicRegex(topics) - topicsExclude, _ := cmd.Flags().GetString("topics-exclude") - params.topicsExclude = topicRegex(topicsExclude) - verbose, _ := cmd.Flags().GetBool("verbose") - params.verbose = verbose - return params -} - func scale(cmd *cobra.Command, _ []string) { sanitizeInput(cmd) - params := scaleParamsFromCmd(cmd) + params := reassignParamsFromCmd(cmd) + params.requireNewBrokers = true // ZooKeeper init. zkAddr := cmd.Parent().Flag("zk-addr").Value.String() @@ -92,184 +52,13 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() - // Get broker and partition metadata. - if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { - fmt.Println(err) - os.Exit(1) - } - brokerMeta, errs := getBrokerMeta(zk, true) - if errs != nil && brokerMeta == nil { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - partitionMeta, err := getPartitionMeta(zk) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Get the current partition map. - partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Exclude any topics that are pending deletion. - pending, err := stripPendingDeletes(partitionMapIn, zk) - if err != nil { - fmt.Println("Error fetching topics pending deletion") - } - - // Exclude any explicit exclusions. - excluded := removeTopics(partitionMapIn, params.topicsExclude) - - // Print topics matched to input params. - printTopics(partitionMapIn) - - // Print if any topics were excluded due to pending deletion. - printExcludedTopics(pending, excluded) - - // Get a broker map. - brokersIn := kafkazk.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false) - - // Validate all broker params, get a copy of the - // broker IDs targeted for partition offloading. - offloadTargets := validateBrokersForScale(params, brokersIn, brokerMeta) - - // Sort offloadTargets by storage free ascending. - sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) - - // Generate reassignmentBundles for a scale up. - reassignmentBundlesParams := computeReassignmentBundlesParams{ - offloadTargets: offloadTargets, - tolerance: params.tolerance, - partitionMap: partitionMapIn, - partitionMeta: partitionMeta, - brokerMap: brokersIn, - partitionLimit: params.partitionLimit, - partitionSizeThreshold: params.partitionSizeThreshold, - localityScoped: params.localityScoped, - verbose: params.verbose, - } - - results := computeReassignmentBundles(reassignmentBundlesParams) - - // Merge all results into a slice. - resultsByRange := []reassignmentBundle{} - for r := range results { - resultsByRange = append(resultsByRange, r) - } + partitionMap, _ := reassign(params, zk) - // Sort the scale results by range ascending. - sort.Slice(resultsByRange, func(i, j int) bool { - switch { - case resultsByRange[i].storageRange < resultsByRange[j].storageRange: - return true - case resultsByRange[i].storageRange > resultsByRange[j].storageRange: - return false - } + // TODO intentionally not handling the one error that can be returned here + // right now, but would be better to distinguish errors + // handleOverridableErrs(cmd, errs) - return resultsByRange[i].stdDev < resultsByRange[j].stdDev - }) - - // Chose the results with the lowest range. - m := resultsByRange[0] - partitionMapOut, brokersOut, relos := m.partitionMap, m.brokers, m.relocations - - // Print parameters used for scale decisions. - printReassignmentParams(cmd, resultsByRange, brokersIn, m.tolerance) - - // Optimize leaders. - if params.optimizeLeadership { - partitionMapOut.OptimizeLeaderFollower() - } - - // Print planned relocations. - printPlannedRelocations(offloadTargets, relos, partitionMeta) - - // Print map change results. - printMapChanges(partitionMapIn, partitionMapOut) - - // Print broker assignment statistics. - errs = printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) - - // Handle errors that are possible - // to be overridden by the user (aka - // 'WARN' in topicmappr console output). - handleOverridableErrs(cmd, errs) - - // Ignore no-ops; scales will naturally have - // a high percentage of these. - partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) - - // Write maps. outPath := cmd.Flag("out-path").Value.String() outFile := cmd.Flag("out-file").Value.String() - writeMaps(outPath, outFile, partitionMapOut, nil) -} - -func validateBrokersForScale(params scaleParams, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { - // No broker changes are permitted in rebalance other than new broker additions. - fmt.Println("\nValidating broker list:") - - // Update the current BrokerList with - // the provided broker list. - c, msgs := brokers.Update(params.brokers, bm) - for m := range msgs { - fmt.Printf("%s%s\n", indent, m) - } - - if c.Changes() { - fmt.Printf("%s-\n", indent) - } - - // Check if any referenced brokers are marked as having missing/partial metrics data. - if errs := ensureBrokerMetrics(brokers, bm); len(errs) > 0 { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - - switch { - case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: - fmt.Printf("%s[ERROR] scale only allows broker additions\n", indent) - os.Exit(1) - case c.New > 0: - fmt.Printf("%s%d additional brokers added\n", indent, c.New) - fmt.Printf("%s-\n", indent) - fmt.Printf("%sOK\n", indent) - default: - fmt.Printf("%s[ERROR] scale requires additional brokers\n", indent) - os.Exit(1) - } - - var offloadTargets []int - - // Offload targets are all non-new brokers. - f := func(b *kafkazk.Broker) bool { return !b.New } - - matches := brokers.Filter(f) - for _, b := range matches { - offloadTargets = append(offloadTargets, b.ID) - } - - sort.Ints(offloadTargets) - - fmt.Println("\nBrokers targeted for partition offloading:") - - // Exit if we've hit insufficient broker counts. - if len(offloadTargets) == 0 { - fmt.Printf("%s[none]\n", indent) - os.Exit(0) - } else { - for _, id := range offloadTargets { - fmt.Printf("%s%d\n", indent, id) - } - } - - return offloadTargets + writeMaps(outPath, outFile, partitionMap, nil) } From 3264dfd88a2c1a22c5b14e9160a2903fbbc96bf2 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Thu, 2 Dec 2021 20:48:56 +0100 Subject: [PATCH 17/18] [tm] make writeMaps handle arrays --- cmd/topicmappr/commands/output.go | 33 +++++++++--------------- cmd/topicmappr/commands/reassignments.go | 4 +-- cmd/topicmappr/commands/rebalance.go | 4 +-- cmd/topicmappr/commands/rebuild.go | 2 +- cmd/topicmappr/commands/scale.go | 4 +-- 5 files changed, 19 insertions(+), 28 deletions(-) diff --git a/cmd/topicmappr/commands/output.go b/cmd/topicmappr/commands/output.go index a42591d..e44ac48 100644 --- a/cmd/topicmappr/commands/output.go +++ b/cmd/topicmappr/commands/output.go @@ -190,34 +190,29 @@ func skipReassignmentNoOps(pm1, pm2 *kafkazk.PartitionMap) (*kafkazk.PartitionMa } // writeMaps takes a PartitionMap and writes out files. -func writeMaps(outPath, outFile string, pm *kafkazk.PartitionMap, phasedPM *kafkazk.PartitionMap) { - if len(pm.Partitions) == 0 { +func writeMaps(outPath, outFile string, pms []*kafkazk.PartitionMap) { + if len(pms) == 0 || len(pms[0].Partitions) == 0 { fmt.Println("\nNo partition reassignments, skipping map generation") return } - // If we've been provided a phased output map. - var phaseSuffix [2]string - if phasedPM != nil { - phaseSuffix[0] = "-phase1" - phaseSuffix[1] = "-phase2" + phasedMaps := len(pms) > 1 + suffix := func(i int) string { + if phasedMaps { + return fmt.Sprintf("-phase%d", i) + } + return "" } // Break map up by topic. tm := map[string]*kafkazk.PartitionMap{} - outputMaps := []*kafkazk.PartitionMap{phasedPM, pm} - // For each map type, create per-topic maps. - for i, m := range outputMaps { - // We may not have a phasedPM. - if m == nil { - continue - } + for i, m := range pms { // Populate each partition in the parent map keyed // by topic name and possible phase suffix. for _, p := range m.Partitions { - mapName := fmt.Sprintf("%s%s", p.Topic, phaseSuffix[i]) + mapName := fmt.Sprintf("%s%s", p.Topic, suffix(i)) if tm[mapName] == nil { tm[mapName] = kafkazk.NewPartitionMap() } @@ -229,12 +224,8 @@ func writeMaps(outPath, outFile string, pm *kafkazk.PartitionMap, phasedPM *kafk // Write global map if set. if outFile != "" { - for i, m := range outputMaps { - if m == nil { - continue - } - - fullPath := fmt.Sprintf("%s%s%s", outPath, outFile, phaseSuffix[i]) + for i, m := range pms { + fullPath := fmt.Sprintf("%s%s%s", outPath, outFile, suffix(i)) err := kafkazk.WriteMap(m, fullPath) if err != nil { fmt.Printf("%s%s", indent, err) diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index fd0af34..e517506 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -76,7 +76,7 @@ func reassignParamsFromCmd(cmd *cobra.Command) (params reassignParams) { return params } -func reassign(params reassignParams, zk kafkazk.Handler) (*kafkazk.PartitionMap, []error) { +func reassign(params reassignParams, zk kafkazk.Handler) ([]*kafkazk.PartitionMap, []error) { // Get broker and partition metadata. if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { fmt.Println(err) @@ -186,7 +186,7 @@ func reassign(params reassignParams, zk kafkazk.Handler) (*kafkazk.PartitionMap, // Ignore no-ops; rebalances will naturally have a high percentage of these. partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) - return partitionMapOut, errs + return []*kafkazk.PartitionMap{partitionMapOut}, errs } diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 0d47980..7d5fcad 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -54,7 +54,7 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() - partitionMap, errs := reassign(params, zk) + partitionMaps, errs := reassign(params, zk) // Handle errors that are possible to be overridden by the user (aka 'WARN' // in topicmappr console output). @@ -63,5 +63,5 @@ func rebalance(cmd *cobra.Command, _ []string) { // Write maps. outPath := cmd.Flag("out-path").Value.String() outFile := cmd.Flag("out-file").Value.String() - writeMaps(outPath, outFile, partitionMap, nil) + writeMaps(outPath, outFile, partitionMaps) } diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index f8c70e5..3f5ea1c 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -307,5 +307,5 @@ func rebuild(cmd *cobra.Command, _ []string) { outPath := cmd.Flag("out-path").Value.String() outFile := cmd.Flag("out-file").Value.String() - writeMaps(outPath, outFile, partitionMapOut, phasedMap) + writeMaps(outPath, outFile, []*kafkazk.PartitionMap{phasedMap, partitionMapOut}) } diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index d595f77..9ea1bb8 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -52,7 +52,7 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() - partitionMap, _ := reassign(params, zk) + partitionMaps, _ := reassign(params, zk) // TODO intentionally not handling the one error that can be returned here // right now, but would be better to distinguish errors @@ -60,5 +60,5 @@ func scale(cmd *cobra.Command, _ []string) { outPath := cmd.Flag("out-path").Value.String() outFile := cmd.Flag("out-file").Value.String() - writeMaps(outPath, outFile, partitionMap, nil) + writeMaps(outPath, outFile, partitionMaps) } From 295fcd66a1222ced30ea52f816ff7132f86d5da6 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Thu, 2 Dec 2021 20:41:07 +0100 Subject: [PATCH 18/18] [tm/rebuild] split rebuild logic from command --- cmd/topicmappr/commands/rebuild.go | 182 +++-------------------- cmd/topicmappr/commands/rebuild_steps.go | 146 ++++++++++++++++++ 2 files changed, 168 insertions(+), 160 deletions(-) diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 3f5ea1c..8b46630 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -73,48 +73,48 @@ type rebuildParams struct { leaderEvacBrokers []int } -func rebuildParamsFromCmd(cmd *cobra.Command) (c rebuildParams) { +func rebuildParamsFromCmd(cmd *cobra.Command) (params rebuildParams) { brokers, _ := cmd.Flags().GetString("brokers") - c.brokers = brokerStringToSlice(brokers) + params.brokers = brokerStringToSlice(brokers) forceRebuild, _ := cmd.Flags().GetBool("force-rebuild") - c.forceRebuild = forceRebuild + params.forceRebuild = forceRebuild mapString, _ := cmd.Flags().GetString("map-string") - c.mapString = mapString + params.mapString = mapString maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") - c.maxMetadataAge = maxMetadataAge + params.maxMetadataAge = maxMetadataAge minRackIds, _ := cmd.Flags().GetInt("min-rack-ids") - c.minRackIds = minRackIds + params.minRackIds = minRackIds optimize, _ := cmd.Flags().GetString("optimize") - c.optimize = optimize + params.optimize = optimize optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") - c.optimizeLeadership = optimizeLeadership + params.optimizeLeadership = optimizeLeadership partitionSizeFactor, _ := cmd.Flags().GetFloat64("partition-size-factor") - c.partitionSizeFactor = partitionSizeFactor + params.partitionSizeFactor = partitionSizeFactor phasedReassignment, _ := cmd.Flags().GetBool("phased-reassignment") - c.phasedReassignment = phasedReassignment + params.phasedReassignment = phasedReassignment placement, _ := cmd.Flags().GetString("placement") - c.placement = placement + params.placement = placement replication, _ := cmd.Flags().GetInt("replication") - c.replication = replication + params.replication = replication skipNoOps, _ := cmd.Flags().GetBool("skip-no-ops") - c.skipNoOps = skipNoOps + params.skipNoOps = skipNoOps subAffinity, _ := cmd.Flags().GetBool("sub-affinity") - c.subAffinity = subAffinity + params.subAffinity = subAffinity topics, _ := cmd.Flags().GetString("topics") - c.topics = topicRegex(topics) + params.topics = topicRegex(topics) topicsExclude, _ := cmd.Flags().GetString("topics-exclude") - c.topicsExclude = topicRegex(topicsExclude) + params.topicsExclude = topicRegex(topicsExclude) useMetadata, _ := cmd.Flags().GetBool("use-meta") - c.useMetadata = useMetadata + params.useMetadata = useMetadata let, _ := cmd.Flags().GetString("leader-evac-topics") if let != "" { - c.leaderEvacTopics = topicRegex(let) + params.leaderEvacTopics = topicRegex(let) } leb, _ := cmd.Flags().GetString("leader-evac-brokers") if leb != "" { - c.leaderEvacBrokers = brokerStringToSlice(leb) + params.leaderEvacBrokers = brokerStringToSlice(leb) } - return c + return params } func (c rebuildParams) validate() error { @@ -162,150 +162,12 @@ func rebuild(cmd *cobra.Command, _ []string) { defer zk.Close() } - // In addition to the global topic regex, we have leader-evac topic regex as well. - var evacTopics []string - if len(params.leaderEvacTopics) != 0 { - evacTopics, err = zk.GetTopics(params.leaderEvacTopics) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - } - - // General flow: - // 1) A PartitionMap is formed (either unmarshaled from the literal - // map input via --rebuild-map or generated from ZooKeeper Metadata - // for topics matching --topics). - // 2) A BrokerMap is formed from brokers found in the PartitionMap - // along with any new brokers provided via the --brokers param. - // 3) The PartitionMap and BrokerMap are fed to a rebuild - // function. Missing brokers, brokers marked for replacement, - // and all other placements are performed, returning a new - // PartitionMap. - // 4) Differences between the original and new PartitionMap - // are detected and reported. - // 5) The new PartitionMap is split by topic. Map(s) are written. - - // Fetch broker metadata. - var withMetrics bool - if params.placement == "storage" { - if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { - fmt.Println(err) - os.Exit(1) - } - withMetrics = true - } - - var brokerMeta kafkazk.BrokerMetaMap - var errs []error - if params.useMetadata { - if brokerMeta, errs = getBrokerMeta(zk, withMetrics); errs != nil && brokerMeta == nil { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - } - - // Fetch partition metadata. - var partitionMeta kafkazk.PartitionMetaMap - if params.placement == "storage" { - if partitionMeta, err = getPartitionMeta(zk); err != nil { - fmt.Println(err) - os.Exit(1) - } - } - - // Build a partition map either from literal map text input or by fetching the - // map data from ZooKeeper. Store a copy of the original. - partitionMapIn, pending, excluded := getPartitionMap(params, zk) - originalMap := partitionMapIn.Copy() - - // Get a list of affected topics. - printTopics(partitionMapIn) - - // Print if any topics were excluded due to pending deletion or explicit - // exclusion. - printExcludedTopics(pending, excluded) - - brokers, bs := getBrokers(params, partitionMapIn, brokerMeta) - brokersOrig := brokers.Copy() - - if bs.Changes() { - fmt.Printf("%s-\n", indent) - } - - // Check if any referenced brokers are marked as having - // missing/partial metrics data. - if params.useMetadata { - if errs := ensureBrokerMetrics(brokers, brokerMeta); len(errs) > 0 { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - } - - // Create substitution affinities. - affinities := getSubAffinities(params, brokers, brokersOrig, partitionMapIn) - - if affinities != nil { - fmt.Printf("%s-\n", indent) - } - - // Print changes, actions. - printChangesActions(params, bs) - - // Apply any replication factor settings. - updateReplicationFactor(params, partitionMapIn) - - // Build a new map using the provided list of brokers. This is OK to run even - // when a no-op is intended. - partitionMapOut, errs := buildMap(params, partitionMapIn, partitionMeta, brokers, affinities) - - // Optimize leaders. - if params.optimizeLeadership { - partitionMapOut.OptimizeLeaderFollower() - } - - // Count missing brokers as a warning. - if bs.Missing > 0 { - errs = append(errs, fmt.Errorf("%d provided brokers not found in ZooKeeper", bs.Missing)) - } - - // Count missing rack info as warning - if bs.RackMissing > 0 { - errs = append( - errs, fmt.Errorf("%d provided broker(s) do(es) not have a rack.id defined", bs.RackMissing), - ) - } - - // Generate phased map if enabled. - var phasedMap *kafkazk.PartitionMap - if params.phasedReassignment { - phasedMap = phasedReassignment(originalMap, partitionMapOut) - } - - partitionMapOut = evacuateLeadership(*partitionMapOut, params.leaderEvacBrokers, evacTopics) - - // Print map change results. - printMapChanges(originalMap, partitionMapOut) - - // Print broker assignment statistics. - errs = append( - errs, - printBrokerAssignmentStats(originalMap, partitionMapOut, brokersOrig, brokers, params.placement == "storage", params.partitionSizeFactor)..., - ) + maps, errs := runRebuild(params, zk) // Print error/warnings. handleOverridableErrs(cmd, errs) - // Skip no-ops if configured. - if params.skipNoOps { - originalMap, partitionMapOut = skipReassignmentNoOps(originalMap, partitionMapOut) - } - outPath := cmd.Flag("out-path").Value.String() outFile := cmd.Flag("out-file").Value.String() - writeMaps(outPath, outFile, []*kafkazk.PartitionMap{phasedMap, partitionMapOut}) + writeMaps(outPath, outFile, maps) } diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index 744c9c9..dc37864 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -7,6 +7,152 @@ import ( "github.com/DataDog/kafka-kit/v3/kafkazk" ) +func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*kafkazk.PartitionMap, []error) { + // General flow: + // 1) A PartitionMap is formed (either unmarshaled from the literal + // map input via --rebuild-map or generated from ZooKeeper Metadata + // for topics matching --topics). + // 2) A BrokerMap is formed from brokers found in the PartitionMap + // along with any new brokers provided via the --brokers param. + // 3) The PartitionMap and BrokerMap are fed to a rebuild + // function. Missing brokers, brokers marked for replacement, + // and all other placements are performed, returning a new + // PartitionMap. + // 4) Differences between the original and new PartitionMap + // are detected and reported. + // 5) The new PartitionMap is split by topic. Map(s) are written. + + // In addition to the global topic regex, we have leader-evac topic regex as well. + var evacTopics []string + var err error + if len(params.leaderEvacTopics) != 0 { + evacTopics, err = zk.GetTopics(params.leaderEvacTopics) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + } + + // Fetch broker metadata. + var withMetrics bool + if params.placement == "storage" { + if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { + fmt.Println(err) + os.Exit(1) + } + withMetrics = true + } + + var brokerMeta kafkazk.BrokerMetaMap + var errs []error + if params.useMetadata { + if brokerMeta, errs = getBrokerMeta(zk, withMetrics); errs != nil && brokerMeta == nil { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + } + + // Fetch partition metadata. + var partitionMeta kafkazk.PartitionMetaMap + if params.placement == "storage" { + if partitionMeta, err = getPartitionMeta(zk); err != nil { + fmt.Println(err) + os.Exit(1) + } + } + + // Build a partition map either from literal map text input or by fetching the + // map data from ZooKeeper. Store a copy of the original. + partitionMapIn, pending, excluded := getPartitionMap(params, zk) + originalMap := partitionMapIn.Copy() + + // Get a list of affected topics. + printTopics(partitionMapIn) + + // Print if any topics were excluded due to pending deletion or explicit + // exclusion. + printExcludedTopics(pending, excluded) + + brokers, bs := getBrokers(params, partitionMapIn, brokerMeta) + brokersOrig := brokers.Copy() + + if bs.Changes() { + fmt.Printf("%s-\n", indent) + } + + // Check if any referenced brokers are marked as having + // missing/partial metrics data. + if params.useMetadata { + if errs := ensureBrokerMetrics(brokers, brokerMeta); len(errs) > 0 { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + } + + // Create substitution affinities. + affinities := getSubAffinities(params, brokers, brokersOrig, partitionMapIn) + + if affinities != nil { + fmt.Printf("%s-\n", indent) + } + + // Print changes, actions. + printChangesActions(params, bs) + + // Apply any replication factor settings. + updateReplicationFactor(params, partitionMapIn) + + // Build a new map using the provided list of brokers. This is OK to run even + // when a no-op is intended. + partitionMapOut, errs := buildMap(params, partitionMapIn, partitionMeta, brokers, affinities) + + // Optimize leaders. + if params.optimizeLeadership { + partitionMapOut.OptimizeLeaderFollower() + } + + // Count missing brokers as a warning. + if bs.Missing > 0 { + errs = append(errs, fmt.Errorf("%d provided brokers not found in ZooKeeper", bs.Missing)) + } + + // Count missing rack info as warning + if bs.RackMissing > 0 { + errs = append( + errs, fmt.Errorf("%d provided broker(s) do(es) not have a rack.id defined", bs.RackMissing), + ) + } + + outputMaps := []*kafkazk.PartitionMap{} + // Generate phased map if enabled. + if params.phasedReassignment { + outputMaps = append(outputMaps, phasedReassignment(originalMap, partitionMapOut)) + } + + partitionMapOut = evacuateLeadership(*partitionMapOut, params.leaderEvacBrokers, evacTopics) + + // Print map change results. + printMapChanges(originalMap, partitionMapOut) + + // Print broker assignment statistics. + errs = append( + errs, + printBrokerAssignmentStats(originalMap, partitionMapOut, brokersOrig, brokers, params.placement == "storage", params.partitionSizeFactor)..., + ) + + // Skip no-ops if configured. + if params.skipNoOps { + originalMap, partitionMapOut = skipReassignmentNoOps(originalMap, partitionMapOut) + } + outputMaps = append(outputMaps, partitionMapOut) + + return outputMaps, errs +} + // *References to metrics metadata persisted in ZooKeeper, see: // https://github.com/DataDog/kafka-kit/tree/master/cmd/metricsfetcher#data-structures)