diff --git a/cmd/topicmappr/commands/config.go b/cmd/topicmappr/commands/config.go index 6d2aa67..39d0c0e 100644 --- a/cmd/topicmappr/commands/config.go +++ b/cmd/topicmappr/commands/config.go @@ -23,39 +23,20 @@ const ( var ( // Characters allowed in Kafka topic names topicNormalChar = regexp.MustCompile(`[a-zA-Z0-9_\\-]`) - - // Config holds global configs. - Config struct { - topics []*regexp.Regexp - topicsExclude []*regexp.Regexp - brokers []int - } ) -func bootstrap(cmd *cobra.Command) { - b, _ := cmd.Flags().GetString("brokers") - Config.brokers = brokerStringToSlice(b) - +func sanitizeInput(cmd *cobra.Command) { // Append trailing slash if not included. op := cmd.Flag("out-path").Value.String() if op != "" && !strings.HasSuffix(op, "/") { cmd.Flags().Set("out-path", op+"/") } - - // Populate topic include / exclude regex. - if include, _ := cmd.Flags().GetString("topics"); include != "" { - Config.topics = TopicRegex(include) - } - - if exclude, _ := cmd.Flags().GetString("topics-exclude"); exclude != "" { - Config.topicsExclude = TopicRegex(exclude) - } } -// TopicRegex takes a string of csv values and returns a []*regexp.Regexp. +// topicRegex takes a string of csv values and returns a []*regexp.Regexp. // The values are either a string literal and become ^value$ or are regex and // compiled then added. -func TopicRegex(s string) []*regexp.Regexp { +func topicRegex(s string) []*regexp.Regexp { var out []*regexp.Regexp // Update string literals to ^value$ regex. @@ -88,23 +69,21 @@ func TopicRegex(s string) []*regexp.Regexp { // topic discovery` via ZooKeeper. // - that the --placement flag was set to 'storage', which expects // metrics metadata to be stored in ZooKeeper. -func initZooKeeper(cmd *cobra.Command) (kafkazk.Handler, error) { +func initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix string) (kafkazk.Handler, error) { // Suppress underlying ZK client noise. log.SetOutput(ioutil.Discard) - zkAddr := cmd.Parent().Flag("zk-addr").Value.String() - timeout := 250 * time.Millisecond - zk, err := kafkazk.NewHandler(&kafkazk.Config{ Connect: zkAddr, - Prefix: cmd.Parent().Flag("zk-prefix").Value.String(), - MetricsPrefix: cmd.Flag("zk-metrics-prefix").Value.String(), + Prefix: kafkaPrefix, + MetricsPrefix: metricsPrefix, }) if err != nil { return nil, fmt.Errorf("Error connecting to ZooKeeper: %s", err) } + timeout := 250 * time.Millisecond time.Sleep(timeout) if !zk.Ready() { diff --git a/cmd/topicmappr/commands/evac_leadership_test.go b/cmd/topicmappr/commands/evac_leadership_test.go index c524ff7..a1f344d 100644 --- a/cmd/topicmappr/commands/evac_leadership_test.go +++ b/cmd/topicmappr/commands/evac_leadership_test.go @@ -41,7 +41,7 @@ var pMapIn = kafkazk.PartitionMap{ func TestRemoveProblemBroker(t *testing.T) { problemBrokerId := 10001 - pMapOut := EvacLeadership(pMapIn, []int{problemBrokerId}, []string{topic}) + pMapOut := evacuateLeadership(pMapIn, []int{problemBrokerId}, []string{topic}) for _, partition := range pMapOut.Partitions { if partition.Replicas[0] == problemBrokerId { @@ -53,7 +53,7 @@ func TestRemoveProblemBroker(t *testing.T) { func TestEvacTwoProblemBrokers(t *testing.T) { problemBrokers := []int{10001, 10002} - pMapOut := EvacLeadership(pMapIn, problemBrokers, []string{topic}) + pMapOut := evacuateLeadership(pMapIn, problemBrokers, []string{topic}) for _, partition := range pMapOut.Partitions { if partition.Replicas[0] == problemBrokers[0] || partition.Replicas[0] == problemBrokers[1] { @@ -63,7 +63,7 @@ func TestEvacTwoProblemBrokers(t *testing.T) { } func TestNoMatchingTopicToEvac(t *testing.T) { - pMapOut := EvacLeadership(pMapIn, []int{10001}, []string{"some other topic"}) + pMapOut := evacuateLeadership(pMapIn, []int{10001}, []string{"some other topic"}) for i, partition := range pMapOut.Partitions { for j, broker := range partition.Replicas { @@ -81,7 +81,7 @@ func TestNoMatchingTopicToEvac(t *testing.T) { // problemBrokers[10002] = &kafkazk.Broker{} // problemBrokers[10003] = &kafkazk.Broker{} // -// EvacLeadership(pMapIn, problemBrokers) +// evacuateLeadership(pMapIn, problemBrokers) // -// t.Errorf("EvacLeadership should have errored out at this point.") +// t.Errorf("evacuateLeadership should have errored out at this point.") //} diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index d029e4e..bd28a72 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -2,93 +2,63 @@ package commands import ( "fmt" - "os" "regexp" "time" "github.com/DataDog/kafka-kit/v3/kafkazk" - - "github.com/spf13/cobra" ) // checkMetaAge checks the age of the stored partition and broker storage // metrics data against the tolerated metrics age parameter. -func checkMetaAge(cmd *cobra.Command, zk kafkazk.Handler) { +func checkMetaAge(zk kafkazk.Handler, maxAge int) error { age, err := zk.MaxMetaAge() if err != nil { - fmt.Printf("Error fetching metrics metadata: %s\n", err) - os.Exit(1) + return fmt.Errorf("Error fetching metrics metadata: %s\n", err) } - tol, _ := cmd.Flags().GetInt("metrics-age") - - if age > time.Duration(tol)*time.Minute { - fmt.Printf("Metrics metadata is older than allowed: %s\n", age) - os.Exit(1) + if age > time.Duration(maxAge)*time.Minute { + return fmt.Errorf("Metrics metadata is older than allowed: %s\n", age) } + return nil } // getBrokerMeta returns a map of brokers and broker metadata for those // registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper // (via an external mechanism*) can be merged into the metadata. -func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.BrokerMetaMap { - brokerMeta, errs := zk.GetAllBrokerMeta(m) - // If no data is returned, report and exit. Otherwise, it's possible that - // complete data for a few brokers wasn't returned. We check in subsequent - // steps as to whether any brokers that matter are missing metrics. - if errs != nil && brokerMeta == nil { - for _, e := range errs { - fmt.Println(e) - } - os.Exit(1) - } - - return brokerMeta +func getBrokerMeta(zk kafkazk.Handler, m bool) (kafkazk.BrokerMetaMap, []error) { + return zk.GetAllBrokerMeta(m) } // ensureBrokerMetrics takes a map of reference brokers and a map of discovered // broker metadata. Any non-missing brokers in the broker map must be present // in the broker metadata map and have a non-true MetricsIncomplete value. -func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) { - var e bool +func ensureBrokerMetrics(bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) []error { + errs := []error{} for id, b := range bm { // Missing brokers won't be found in the brokerMeta. if !b.Missing && id != kafkazk.StubBrokerID && bmm[id].MetricsIncomplete { - e = true - fmt.Printf("Metrics not found for broker %d\n", id) + errs = append(errs, fmt.Errorf("Metrics not found for broker %d\n", id)) } } - - if e { - os.Exit(1) - } + return errs } // getPartitionMeta returns a map of topic, partition metadata persisted in // ZooKeeper (via an external mechanism*). This is primarily partition size // metrics data used for the storage placement strategy. -func getPartitionMeta(cmd *cobra.Command, zk kafkazk.Handler) kafkazk.PartitionMetaMap { - partitionMeta, err := zk.GetAllPartitionMeta() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - return partitionMeta +func getPartitionMeta(zk kafkazk.Handler) (kafkazk.PartitionMetaMap, error) { + return zk.GetAllPartitionMeta() } // stripPendingDeletes takes a partition map and zk handler. It looks up any // topics in a pending delete state and removes them from the provided partition // map, returning a list of topics removed. -func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) []string { +func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) ([]string, error) { // Get pending deletions. pd, err := zk.GetPendingDeletion() - if err != nil { - fmt.Println("Error fetching topics pending deletion") - } if len(pd) == 0 { - return []string{} + return []string{}, err } // Convert to a series of literal regex. @@ -99,7 +69,7 @@ func stripPendingDeletes(pm *kafkazk.PartitionMap, zk kafkazk.Handler) []string } // Update the PartitionMap and return a list of removed topic names. - return removeTopics(pm, re) + return removeTopics(pm, re), err } // removeTopics takes a PartitionMap and []*regexp.Regexp of topic name patters. diff --git a/cmd/topicmappr/commands/output.go b/cmd/topicmappr/commands/output.go index 325f7c0..e44ac48 100644 --- a/cmd/topicmappr/commands/output.go +++ b/cmd/topicmappr/commands/output.go @@ -6,7 +6,6 @@ import ( "math" "os" "sort" - "strings" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -23,7 +22,6 @@ func (e errors) Swap(i, j int) { e[i], e[j] = e[j], e[i] } // referenced in the map. func printTopics(pm *kafkazk.PartitionMap) { topics := pm.Topics() - sort.Strings(topics) fmt.Printf("\nTopics:\n") for _, t := range topics { @@ -81,7 +79,7 @@ func printMapChanges(pm1, pm2 *kafkazk.PartitionMap) { // printBrokerAssignmentStats prints before and after broker usage stats, // such as leadership counts, total partitions owned, degree distribution, // and changes in storage usage. -func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionMap, bm1, bm2 kafkazk.BrokerMap) errors { +func printBrokerAssignmentStats(pm1, pm2 *kafkazk.PartitionMap, bm1, bm2 kafkazk.BrokerMap, storageBased bool, partitionSizeFactor float64) errors { var errs errors fmt.Println("\nBroker distribution:") @@ -101,17 +99,10 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM } // If we're using the storage placement strategy, write anticipated storage changes. - psf, _ := cmd.Flags().GetFloat64("partition-size-factor") - - switch { - case - cmd.Name() == "scale", - cmd.Name() == "rebalance", - cmd.Flag("placement").Value.String() == "storage": - + if storageBased { fmt.Println("\nStorage free change estimations:") - if psf != 1.0 && cmd.Name() != "rebalance" { - fmt.Printf("%sPartition size factor of %.2f applied\n", indent, psf) + if partitionSizeFactor != 1.0 { + fmt.Printf("%sPartition size factor of %.2f applied\n", indent, partitionSizeFactor) } // Get filtered BrokerMaps. For the 'before' broker statistics, we want @@ -124,40 +115,14 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM // input and wasn't marked for replacement (generally, users are doing storage placements // particularly to balance out the storage of the input broker list). - // Filter function for brokers where the Replaced field is false. - nonReplaced := func(b *kafkazk.Broker) bool { - if b.Replace { - return false - } - return true - } - - // Get all IDs in PartitionMap. - mappedIDs := map[int]struct{}{} - for _, partn := range pm1.Partitions { - for _, id := range partn.Replicas { - mappedIDs[id] = struct{}{} - } - } - - // Filter function for brokers that are in the partition map. - mapped := func(b *kafkazk.Broker) bool { - if _, exist := mappedIDs[b.ID]; exist { - return true - } - return false - } - - mb1, mb2 := bm1.Filter(mapped), bm2.Filter(nonReplaced) + mb1, mb2 := bm1.Filter(pm1.BrokersIn()), bm2.Filter(kafkazk.NotReplacedBrokersFn) // Range before/after. r1, r2 := mb1.StorageRange(), mb2.StorageRange() fmt.Printf("%srange: %.2fGB -> %.2fGB\n", indent, r1/div, r2/div) if r2 > r1 { // Range increases are acceptable and common in scale up operations. - if cmd.Name() != "scale" { - errs = append(errs, fmt.Errorf("broker free storage range increased")) - } + errs = append(errs, fmt.Errorf("broker free storage range increased")) } // Range spread before/after. @@ -225,37 +190,29 @@ func skipReassignmentNoOps(pm1, pm2 *kafkazk.PartitionMap) (*kafkazk.PartitionMa } // writeMaps takes a PartitionMap and writes out files. -func writeMaps(cmd *cobra.Command, pm *kafkazk.PartitionMap, phasedPM *kafkazk.PartitionMap) { - if len(pm.Partitions) == 0 { +func writeMaps(outPath, outFile string, pms []*kafkazk.PartitionMap) { + if len(pms) == 0 || len(pms[0].Partitions) == 0 { fmt.Println("\nNo partition reassignments, skipping map generation") return } - // If we've been provided a phased output map. - var phaseSuffix [2]string - if phasedPM != nil { - phaseSuffix[0] = "-phase1" - phaseSuffix[1] = "-phase2" + phasedMaps := len(pms) > 1 + suffix := func(i int) string { + if phasedMaps { + return fmt.Sprintf("-phase%d", i) + } + return "" } - outPath := cmd.Flag("out-path").Value.String() - outFile := cmd.Flag("out-file").Value.String() - // Break map up by topic. tm := map[string]*kafkazk.PartitionMap{} - outputMaps := []*kafkazk.PartitionMap{phasedPM, pm} - // For each map type, create per-topic maps. - for i, m := range outputMaps { - // We may not have a phasedPM. - if m == nil { - continue - } + for i, m := range pms { // Populate each partition in the parent map keyed // by topic name and possible phase suffix. for _, p := range m.Partitions { - mapName := fmt.Sprintf("%s%s", p.Topic, phaseSuffix[i]) + mapName := fmt.Sprintf("%s%s", p.Topic, suffix(i)) if tm[mapName] == nil { tm[mapName] = kafkazk.NewPartitionMap() } @@ -267,12 +224,8 @@ func writeMaps(cmd *cobra.Command, pm *kafkazk.PartitionMap, phasedPM *kafkazk.P // Write global map if set. if outFile != "" { - for i, m := range outputMaps { - if m == nil { - continue - } - - fullPath := fmt.Sprintf("%s%s%s", outPath, outFile, phaseSuffix[i]) + for i, m := range pms { + fullPath := fmt.Sprintf("%s%s%s", outPath, outFile, suffix(i)) err := kafkazk.WriteMap(m, fullPath) if err != nil { fmt.Printf("%s%s", indent, err) @@ -293,15 +246,12 @@ func writeMaps(cmd *cobra.Command, pm *kafkazk.PartitionMap, phasedPM *kafkazk.P } } -func printReassignmentParams(cmd *cobra.Command, results []reassignmentBundle, brokers kafkazk.BrokerMap, tol float64) { - subCmd := cmd.Name() +func printReassignmentParams(params reassignParams, results []reassignmentBundle, brokers kafkazk.BrokerMap, tol float64) { + fmt.Printf("\nReassignment parameters:\n") - fmt.Printf("\n%s parameters:\n", strings.Title(subCmd)) - - pst, _ := cmd.Flags().GetInt("partition-size-threshold") mean, hMean := brokers.Mean(), brokers.HMean() - fmt.Printf("%sIgnoring partitions smaller than %dMB\n", indent, pst) + fmt.Printf("%sIgnoring partitions smaller than %dMB\n", indent, params.partitionSizeThreshold) fmt.Printf("%sFree storage mean, harmonic mean: %.2fGB, %.2fGB\n", indent, mean/div, hMean/div) @@ -311,11 +261,9 @@ func printReassignmentParams(cmd *cobra.Command, results []reassignmentBundle, b fmt.Printf("%s%sSources limited to <= %.2fGB\n", indent, indent, mean*(1+tol)/div) fmt.Printf("%s%sDestinations limited to >= %.2fGB\n", indent, indent, mean*(1-tol)/div) - verbose, _ := cmd.Flags().GetBool("verbose") - // Print the top 10 rebalance results in verbose. - if verbose { - fmt.Printf("%s-\nTop 10 %s map results\n", indent, subCmd) + if params.verbose { + fmt.Printf("%s-\nTop 10 reassignment map results\n", indent) for i, r := range results { fmt.Printf("%stolerance: %.2f -> range: %.2fGB, std. deviation: %.2fGB\n", indent, r.tolerance, r.storageRange/div, r.stdDev/div) diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index 627bada..e517506 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -1,11 +1,15 @@ package commands import ( + "bytes" "fmt" "os" + "regexp" + "sort" "sync" "github.com/DataDog/kafka-kit/v3/kafkazk" + "github.com/spf13/cobra" ) // reassignmentBundle holds a reassignment PartitionMap along with some input @@ -26,26 +30,180 @@ type reassignmentBundle struct { brokers kafkazk.BrokerMap } -type computeReassignmentBundlesParams struct { - offloadTargets []int - tolerance float64 - partitionMap *kafkazk.PartitionMap - partitionMeta kafkazk.PartitionMetaMap - brokerMap kafkazk.BrokerMap +type reassignParams struct { + brokers []int + localityScoped bool + maxMetadataAge int + optimizeLeadership bool partitionLimit int partitionSizeThreshold int - localityScoped bool + storageThreshold float64 + storageThresholdGB float64 + tolerance float64 + topics []*regexp.Regexp + topicsExclude []*regexp.Regexp + requireNewBrokers bool verbose bool } +func (s reassignParams) UseFixedTolerance() bool { return s.tolerance != 0.00 } + +func reassignParamsFromCmd(cmd *cobra.Command) (params reassignParams) { + brokers, _ := cmd.Flags().GetString("brokers") + params.brokers = brokerStringToSlice(brokers) + localityScoped, _ := cmd.Flags().GetBool("locality-scoped") + params.localityScoped = localityScoped + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + params.maxMetadataAge = maxMetadataAge + optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") + params.optimizeLeadership = optimizeLeadership + partitionLimit, _ := cmd.Flags().GetInt("partition-limit") + params.partitionLimit = partitionLimit + partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") + params.partitionSizeThreshold = partitionSizeThreshold + storageThreshold, _ := cmd.Flags().GetFloat64("storage-threshold") + params.storageThreshold = storageThreshold + storageThresholdGB, _ := cmd.Flags().GetFloat64("storage-threshold-gb") + params.storageThresholdGB = storageThresholdGB + tolerance, _ := cmd.Flags().GetFloat64("tolerance") + params.tolerance = tolerance + topics, _ := cmd.Flags().GetString("topics") + params.topics = topicRegex(topics) + topicsExclude, _ := cmd.Flags().GetString("topics-exclude") + params.topicsExclude = topicRegex(topicsExclude) + verbose, _ := cmd.Flags().GetBool("verbose") + params.verbose = verbose + return params +} + +func reassign(params reassignParams, zk kafkazk.Handler) ([]*kafkazk.PartitionMap, []error) { + // Get broker and partition metadata. + if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { + fmt.Println(err) + os.Exit(1) + } + brokerMeta, errs := getBrokerMeta(zk, true) + if errs != nil && brokerMeta == nil { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + partitionMeta, err := getPartitionMeta(zk) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + // Get the current partition map. + partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + // Exclude any topics that are pending deletion. + pending, err := stripPendingDeletes(partitionMapIn, zk) + if err != nil { + fmt.Println("Error fetching topics pending deletion") + } + + // Exclude any explicit exclusions. + excluded := removeTopics(partitionMapIn, params.topicsExclude) + + // Print topics matched to input params. + printTopics(partitionMapIn) + + // Print if any topics were excluded due to pending deletion. + printExcludedTopics(pending, excluded) + + // Get a broker map. + brokersIn := kafkazk.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false) + + // Validate all broker params, get a copy of the broker IDs targeted for + // partition offloading. + if errs := validateBrokers(params.brokers, brokersIn, brokerMeta, params.requireNewBrokers); len(errs) > 0 { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + + // Get offload targets. + offloadTargets := determineOffloadTargets(params, brokersIn) + + // Sort offloadTargets by storage free ascending. + sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) + + // Generate reassignmentBundles for a rebalance. + results := computeReassignmentBundles( + params, + partitionMapIn, + partitionMeta, + brokersIn, + offloadTargets, + ) + + // Merge all results into a slice. + resultsByRange := []reassignmentBundle{} + for r := range results { + resultsByRange = append(resultsByRange, r) + } + + // Sort the rebalance results by range ascending. + sort.Slice(resultsByRange, func(i, j int) bool { + switch { + case resultsByRange[i].storageRange < resultsByRange[j].storageRange: + return true + case resultsByRange[i].storageRange > resultsByRange[j].storageRange: + return false + } + + return resultsByRange[i].stdDev < resultsByRange[j].stdDev + }) + + // Chose the results with the lowest range. + m := resultsByRange[0] + partitionMapOut, brokersOut, relos := m.partitionMap, m.brokers, m.relocations + + // Print parameters used for rebalance decisions. + printReassignmentParams(params, resultsByRange, brokersIn, m.tolerance) + + // Optimize leaders. + if params.optimizeLeadership { + partitionMapOut.OptimizeLeaderFollower() + } + + // Print planned relocations. + printPlannedRelocations(offloadTargets, relos, partitionMeta) + + // Print map change results. + printMapChanges(partitionMapIn, partitionMapOut) + + // Print broker assignment statistics. + errs = printBrokerAssignmentStats(partitionMapIn, partitionMapOut, brokersIn, brokersOut, true, 1.0) + + // Ignore no-ops; rebalances will naturally have a high percentage of these. + partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) + + return []*kafkazk.PartitionMap{partitionMapOut}, errs + +} + // computeReassignmentBundles takes computeReassignmentBundlesParams and returns // a chan reassignmentBundle. The channel will either contain a single reassignmentBundle // if a fixed computeReassignmentBundlesParams.tolerance value (non 0.00) is // specified, otherwise it will contain a series of reassignmentBundle for multiple // interval values. When generating a series, the results are computed in parallel. -func computeReassignmentBundles(params computeReassignmentBundlesParams) chan reassignmentBundle { +func computeReassignmentBundles( + params reassignParams, + partitionMap *kafkazk.PartitionMap, + partitionMeta kafkazk.PartitionMetaMap, + brokerMap kafkazk.BrokerMap, + offloadTargets []int, +) chan reassignmentBundle { otm := map[int]struct{}{} - for _, id := range params.offloadTargets { + for _, id := range offloadTargets { otm[id] = struct{}{} } @@ -57,13 +215,11 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re // Whether we're using a fixed tolerance (non 0.00 value) set via flag or // interval values. var tol float64 - var fixedTolerance bool - if params.tolerance == 0.00 { - tol = i - } else { + if params.UseFixedTolerance() { tol = params.tolerance - fixedTolerance = true + } else { + tol = i } wg.Add(1) @@ -71,14 +227,14 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re go func() { defer wg.Done() - partitionMap := params.partitionMap.Copy() + partitionMap := partitionMap.Copy() // Bundle planRelocationsForBrokerParams. relocationParams := planRelocationsForBrokerParams{ relos: map[int][]relocation{}, mappings: partitionMap.Mappings(), - brokers: params.brokerMap.Copy(), - partitionMeta: params.partitionMeta, + brokers: brokerMap.Copy(), + partitionMeta: partitionMeta, plan: relocationPlan{}, topPartitionsLimit: params.partitionLimit, partitionSizeThreshold: params.partitionSizeThreshold, @@ -90,9 +246,9 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re // Iterate over offload targets, planning at most one relocation per iteration. // Continue this loop until no more relocations can be planned. - for exhaustedCount := 0; exhaustedCount < len(params.offloadTargets); { + for exhaustedCount := 0; exhaustedCount < len(offloadTargets); { relocationParams.pass++ - for _, sourceID := range params.offloadTargets { + for _, sourceID := range offloadTargets { // Update the source broker ID relocationParams.sourceID = sourceID @@ -121,7 +277,7 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re }() // Break early if we're using a fixed tolerance value. - if fixedTolerance { + if params.UseFixedTolerance() { break } } @@ -132,56 +288,80 @@ func computeReassignmentBundles(params computeReassignmentBundlesParams) chan re return results } -// EvacLeadership For the given set of topics, makes sure that the given brokers are not -// leaders of any partitions. If we have any partitions that only have replicas from the -// evac broker list, we will fail. -func EvacLeadership(partitionMapIn kafkazk.PartitionMap, evacBrokers []int, evacTopics []string) *kafkazk.PartitionMap { - // evacuation algo starts here - partitionMapOut := partitionMapIn.Copy() +func validateBrokers( + newBrokers []int, + currentBrokers kafkazk.BrokerMap, + bm kafkazk.BrokerMetaMap, + newBrokersRequired bool, +) []error { + // No broker changes are permitted in rebalance other than new broker additions. + fmt.Println("\nValidating broker list:") - // make a lookup map of topics - topicsMap := map[string]struct{}{} - for _, topic := range evacTopics { - topicsMap[topic] = struct{}{} + // Update the current BrokerList with + // the provided broker list. + c, msgs := currentBrokers.Update(newBrokers, bm) + for m := range msgs { + fmt.Printf("%s%s\n", indent, m) } - // make a lookup map of topics - brokersMap := map[int]struct{}{} - for _, b := range evacBrokers { - brokersMap[b] = struct{}{} + if c.Changes() { + fmt.Printf("%s-\n", indent) } - // TODO What if we only want to evacuate a subset of partitions? - // For now, problem brokers is the bigger use case. + // Check if any referenced brokers are marked as having missing/partial metrics data. + if errs := ensureBrokerMetrics(currentBrokers, bm); len(errs) > 0 { + return errs + } - // swap leadership for all broker/partition/topic combos - for i, p := range partitionMapIn.Partitions { - // check the topic is one of the target topics - if _, correctTopic := topicsMap[p.Topic]; !correctTopic { - continue - } + switch { + case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: + return []error{fmt.Errorf("reassignment only allows broker additions")} + case c.New > 0: + fmt.Printf("%s%d additional brokers added\n", indent, c.New) + fmt.Printf("%s-\n", indent) + fmt.Printf("%sOK\n", indent) + case newBrokersRequired: + return []error{fmt.Errorf("reassignment requires additional brokers\n")} - // check the leader to see if its one of the evac brokers - if _, contains := brokersMap[p.Replicas[0]]; !contains { - continue - } + } + return nil +} - for j, replica := range p.Replicas { - // If one of the replicas is not being leadership evacuated, use that one and swap. - if _, contains := brokersMap[replica]; !contains { - newLeader := p.Replicas[j] - partitionMapOut.Partitions[i].Replicas[j] = p.Replicas[0] - partitionMapOut.Partitions[i].Replicas[0] = newLeader - break - } +func determineOffloadTargets(params reassignParams, brokers kafkazk.BrokerMap) []int { + var offloadTargets []int + + var selectorMethod bytes.Buffer + selectorMethod.WriteString("Brokers targeted for partition offloading ") - // If we've tried every replica, but they are all being leader evac'd. - if replica == p.Replicas[len(p.Replicas)-1] { - fmt.Println("No replicas available to evacuate leadership to. All replicas present in EvacLeadership broker list.") - os.Exit(1) + // Switch on the target selection method. If a storage threshold in gigabytes + // is specified, prefer this. Otherwise, use the percentage below mean threshold. + var f kafkazk.BrokerFilterFn + if params.storageThresholdGB > 0.00 { + selectorMethod.WriteString(fmt.Sprintf("(< %.2fGB storage free)", params.storageThresholdGB)) + + // Get all non-new brokers with a StorageFree below the storage threshold in GB. + f = func(b *kafkazk.Broker) bool { + if !b.New && b.StorageFree < params.storageThresholdGB*div { + return true } + return false } + + } else if params.storageThreshold > 0.00 { + // Find brokers where the storage free is t % below the harmonic mean. + selectorMethod.WriteString(fmt.Sprintf("(>= %.2f%% threshold below hmean)", params.storageThreshold*100)) + f = kafkazk.BelowMeanFn(params.storageThreshold, brokers.HMean) + } else { + // Specifying 0 targets all non-new brokers, this is a scale up + f = func(b *kafkazk.Broker) bool { return !b.New } } - return partitionMapOut + matches := brokers.Filter(f) + for _, b := range matches { + offloadTargets = append(offloadTargets, b.ID) + } + + fmt.Printf("\n%s:\n", selectorMethod.String()) + + return offloadTargets } diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index d0f304c..7d5fcad 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -1,12 +1,8 @@ package commands import ( - "bytes" "fmt" "os" - "sort" - - "github.com/DataDog/kafka-kit/v3/kafkazk" "github.com/spf13/cobra" ) @@ -33,7 +29,6 @@ func init() { rebalanceCmd.Flags().Int("partition-size-threshold", 512, "Size in megabytes where partitions below this value will not be moved in a rebalance") rebalanceCmd.Flags().Bool("locality-scoped", false, "Ensure that all partition movements are scoped by rack.id") rebalanceCmd.Flags().Bool("verbose", false, "Verbose output") - rebalanceCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") rebalanceCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes)") rebalanceCmd.Flags().Bool("optimize-leadership", false, "Rebalance all broker leader/follower ratios") @@ -43,10 +38,15 @@ func init() { } func rebalance(cmd *cobra.Command, _ []string) { - bootstrap(cmd) + sanitizeInput(cmd) + params := reassignParamsFromCmd(cmd) + params.requireNewBrokers = false // ZooKeeper init. - zk, err := initZooKeeper(cmd) + zkAddr := cmd.Parent().Flag("zk-addr").Value.String() + kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String() + metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String() + zk, err := initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix) if err != nil { fmt.Println(err) os.Exit(1) @@ -54,200 +54,14 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() - // Get broker and partition metadata. - checkMetaAge(cmd, zk) - brokerMeta := getBrokerMeta(cmd, zk, true) - partitionMeta := getPartitionMeta(cmd, zk) - - // Get the current partition map. - partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Exclude any topics that are pending deletion. - pending := stripPendingDeletes(partitionMapIn, zk) - - // Exclude any explicit exclusions. - excluded := removeTopics(partitionMapIn, Config.topicsExclude) - - // Print topics matched to input params. - printTopics(partitionMapIn) - - // Print if any topics were excluded due to pending deletion. - printExcludedTopics(pending, excluded) - - // Get a broker map. - brokersIn := kafkazk.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false) - - // Validate all broker params, get a copy of the broker IDs targeted for - // partition offloading. - offloadTargets := validateBrokersForRebalance(cmd, brokersIn, brokerMeta) - - // Sort offloadTargets by storage free ascending. - sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) - - partitionLimit, _ := cmd.Flags().GetInt("partition-limit") - partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") - tolerance, _ := cmd.Flags().GetFloat64("tolerance") - localityScoped, _ := cmd.Flags().GetBool("locality-scoped") - verbose, _ := cmd.Flags().GetBool("verbose") - - params := computeReassignmentBundlesParams{ - offloadTargets: offloadTargets, - tolerance: tolerance, - partitionMap: partitionMapIn, - partitionMeta: partitionMeta, - brokerMap: brokersIn, - partitionLimit: partitionLimit, - partitionSizeThreshold: partitionSizeThreshold, - localityScoped: localityScoped, - verbose: verbose, - } - - // Generate reassignmentBundles for a rebalance. - results := computeReassignmentBundles(params) - - // Merge all results into a slice. - resultsByRange := []reassignmentBundle{} - for r := range results { - resultsByRange = append(resultsByRange, r) - } - - // Sort the rebalance results by range ascending. - sort.Slice(resultsByRange, func(i, j int) bool { - switch { - case resultsByRange[i].storageRange < resultsByRange[j].storageRange: - return true - case resultsByRange[i].storageRange > resultsByRange[j].storageRange: - return false - } - - return resultsByRange[i].stdDev < resultsByRange[j].stdDev - }) - - // Chose the results with the lowest range. - m := resultsByRange[0] - partitionMapOut, brokersOut, relos := m.partitionMap, m.brokers, m.relocations - - // Print parameters used for rebalance decisions. - printReassignmentParams(cmd, resultsByRange, brokersIn, m.tolerance) - - // Optimize leaders. - if t, _ := cmd.Flags().GetBool("optimize-leadership"); t { - partitionMapOut.OptimizeLeaderFollower() - } - - // Print planned relocations. - printPlannedRelocations(offloadTargets, relos, partitionMeta) - - // Print map change results. - printMapChanges(partitionMapIn, partitionMapOut) - - // Print broker assignment statistics. - errs := printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) + partitionMaps, errs := reassign(params, zk) // Handle errors that are possible to be overridden by the user (aka 'WARN' // in topicmappr console output). handleOverridableErrs(cmd, errs) - // Ignore no-ops; rebalances will naturally have a high percentage of these. - partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) - // Write maps. - writeMaps(cmd, partitionMapOut, nil) -} - -func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { - // No broker changes are permitted in rebalance other than new broker additions. - fmt.Println("\nValidating broker list:") - - // Update the current BrokerList with - // the provided broker list. - c, msgs := brokers.Update(Config.brokers, bm) - for m := range msgs { - fmt.Printf("%s%s\n", indent, m) - } - - if c.Changes() { - fmt.Printf("%s-\n", indent) - } - - // Check if any referenced brokers are marked as having missing/partial metrics data. - ensureBrokerMetrics(cmd, brokers, bm) - - switch { - case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: - fmt.Printf("%s[ERROR] rebalance only allows broker additions\n", indent) - os.Exit(1) - case c.New > 0: - fmt.Printf("%s%d additional brokers added\n", indent, c.New) - fmt.Printf("%s-\n", indent) - fallthrough - default: - fmt.Printf("%sOK\n", indent) - } - - st, _ := cmd.Flags().GetFloat64("storage-threshold") - stg, _ := cmd.Flags().GetFloat64("storage-threshold-gb") - - var selectorMethod bytes.Buffer - selectorMethod.WriteString("Brokers targeted for partition offloading ") - - var offloadTargets []int - - // Switch on the target selection method. If a storage threshold in gigabytes - // is specified, prefer this. Otherwise, use the percentage below mean threshold. - switch { - case stg > 0.00: - selectorMethod.WriteString(fmt.Sprintf("(< %.2fGB storage free)", stg)) - - // Get all non-new brokers with a StorageFree below the storage threshold in GB. - f := func(b *kafkazk.Broker) bool { - if !b.New && b.StorageFree < stg*div { - return true - } - return false - } - - matches := brokers.Filter(f) - for _, b := range matches { - offloadTargets = append(offloadTargets, b.ID) - } - - sort.Ints(offloadTargets) - default: - selectorMethod.WriteString(fmt.Sprintf("(>= %.2f%% threshold below hmean)", st*100)) - - // Find brokers where the storage free is t % below the harmonic mean. - // Specifying 0 targets all non-new brokers. - switch st { - case 0.00: - f := func(b *kafkazk.Broker) bool { return !b.New } - - matches := brokers.Filter(f) - for _, b := range matches { - offloadTargets = append(offloadTargets, b.ID) - } - - sort.Ints(offloadTargets) - default: - offloadTargets = brokers.BelowMean(st, brokers.HMean) - } - } - - fmt.Printf("\n%s:\n", selectorMethod.String()) - - // Exit if no target brokers were found. - if len(offloadTargets) == 0 { - fmt.Printf("%s[none]\n", indent) - os.Exit(0) - } else { - for _, id := range offloadTargets { - fmt.Printf("%s%d\n", indent, id) - } - } - - return offloadTargets + outPath := cmd.Flag("out-path").Value.String() + outFile := cmd.Flag("out-file").Value.String() + writeMaps(outPath, outFile, partitionMaps) } diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 6cd925c..8b46630 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -3,6 +3,7 @@ package commands import ( "fmt" "os" + "regexp" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -38,7 +39,6 @@ func init() { rebuildCmd.Flags().String("optimize", "distribution", "Optimization priority for the storage placement strategy: [distribution, storage]") rebuildCmd.Flags().Float64("partition-size-factor", 1.0, "Factor by which to multiply partition sizes when using storage placement") rebuildCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to ('-1' for all currently mapped brokers, '-2' for all brokers in cluster)") - rebuildCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics (when using storage placement)") rebuildCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes) (when using storage placement)") rebuildCmd.Flags().Bool("skip-no-ops", false, "Skip no-op partition assigments") rebuildCmd.Flags().Bool("optimize-leadership", false, "Rebalance all broker leader/follower ratios") @@ -52,46 +52,109 @@ func init() { rebuildCmd.MarkFlagRequired("brokers") } -func rebuild(cmd *cobra.Command, _ []string) { - // Sanity check params. - t, _ := cmd.Flags().GetString("topics") - ms, _ := cmd.Flags().GetString("map-string") - p := cmd.Flag("placement").Value.String() - o := cmd.Flag("optimize").Value.String() - fr, _ := cmd.Flags().GetBool("force-rebuild") - sa, _ := cmd.Flags().GetBool("sub-affinity") - m, _ := cmd.Flags().GetBool("use-meta") +type rebuildParams struct { + brokers []int + forceRebuild bool + mapString string + maxMetadataAge int + minRackIds int + optimize string + optimizeLeadership bool + partitionSizeFactor float64 + phasedReassignment bool + placement string + replication int + skipNoOps bool + subAffinity bool + topics []*regexp.Regexp + topicsExclude []*regexp.Regexp + useMetadata bool + leaderEvacTopics []*regexp.Regexp + leaderEvacBrokers []int +} +func rebuildParamsFromCmd(cmd *cobra.Command) (params rebuildParams) { + brokers, _ := cmd.Flags().GetString("brokers") + params.brokers = brokerStringToSlice(brokers) + forceRebuild, _ := cmd.Flags().GetBool("force-rebuild") + params.forceRebuild = forceRebuild + mapString, _ := cmd.Flags().GetString("map-string") + params.mapString = mapString + maxMetadataAge, _ := cmd.Flags().GetInt("metrics-age") + params.maxMetadataAge = maxMetadataAge + minRackIds, _ := cmd.Flags().GetInt("min-rack-ids") + params.minRackIds = minRackIds + optimize, _ := cmd.Flags().GetString("optimize") + params.optimize = optimize + optimizeLeadership, _ := cmd.Flags().GetBool("optimize-leadership") + params.optimizeLeadership = optimizeLeadership + partitionSizeFactor, _ := cmd.Flags().GetFloat64("partition-size-factor") + params.partitionSizeFactor = partitionSizeFactor + phasedReassignment, _ := cmd.Flags().GetBool("phased-reassignment") + params.phasedReassignment = phasedReassignment + placement, _ := cmd.Flags().GetString("placement") + params.placement = placement + replication, _ := cmd.Flags().GetInt("replication") + params.replication = replication + skipNoOps, _ := cmd.Flags().GetBool("skip-no-ops") + params.skipNoOps = skipNoOps + subAffinity, _ := cmd.Flags().GetBool("sub-affinity") + params.subAffinity = subAffinity + topics, _ := cmd.Flags().GetString("topics") + params.topics = topicRegex(topics) + topicsExclude, _ := cmd.Flags().GetString("topics-exclude") + params.topicsExclude = topicRegex(topicsExclude) + useMetadata, _ := cmd.Flags().GetBool("use-meta") + params.useMetadata = useMetadata let, _ := cmd.Flags().GetString("leader-evac-topics") + if let != "" { + params.leaderEvacTopics = topicRegex(let) + } leb, _ := cmd.Flags().GetString("leader-evac-brokers") + if leb != "" { + params.leaderEvacBrokers = brokerStringToSlice(leb) + } + return params +} +func (c rebuildParams) validate() error { switch { - case ms == "" && t == "": - fmt.Println("\n[ERROR] must specify either --topics or --map-string") - defaultsAndExit() - case p != "count" && p != "storage": - fmt.Println("\n[ERROR] --placement must be either 'count' or 'storage'") - defaultsAndExit() - case o != "distribution" && o != "storage": - fmt.Println("\n[ERROR] --optimize must be either 'distribution' or 'storage'") - defaultsAndExit() - case !m && p == "storage": - fmt.Println("\n[ERROR] --placement=storage requires --use-meta=true") + case c.mapString == "" && len(c.topics) == 0: + return fmt.Errorf("\n[ERROR] must specify either --topics or --map-string") + case c.placement != "count" && c.placement != "storage": + return fmt.Errorf("\n[ERROR] --placement must be either 'count' or 'storage'") + case c.optimize != "distribution" && c.optimize != "storage": + return fmt.Errorf("\n[ERROR] --optimize must be either 'distribution' or 'storage'") + case !c.useMetadata && c.placement == "storage": + return fmt.Errorf("\n[ERROR] --placement=storage requires --use-meta=true") + case c.forceRebuild && c.subAffinity: + return fmt.Errorf("\n[INFO] --force-rebuild disables --sub-affinity") + case (len(c.leaderEvacBrokers) != 0 || len(c.leaderEvacTopics) != 0) && (len(c.leaderEvacBrokers) == 0 || len(c.leaderEvacTopics) == 0): + return fmt.Errorf("\n[ERROR] --leader-evac-topics and --leader-evac-brokers must both be specified for leadership evacuation.") + } + return nil +} + +func rebuild(cmd *cobra.Command, _ []string) { + sanitizeInput(cmd) + params := rebuildParamsFromCmd(cmd) + + err := params.validate() + if err != nil { + fmt.Println(err) defaultsAndExit() - case fr && sa: + } + if params.forceRebuild && params.subAffinity { fmt.Println("\n[INFO] --force-rebuild disables --sub-affinity") - case (leb != "" || let != "") && (leb == "" || let == ""): - fmt.Println("\n[ERROR] --leader-evac-topics and --leader-evac-brokers must both be specified for leadership evacuation.") - defaultsAndExit() } - bootstrap(cmd) - // ZooKeeper init. var zk kafkazk.Handler - if m || len(Config.topics) > 0 || p == "storage" { - var err error - zk, err = initZooKeeper(cmd) + if params.useMetadata || len(params.topics) > 0 || params.placement == "storage" { + zkAddr := cmd.Parent().Flag("zk-addr").Value.String() + kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String() + metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String() + zk, err = initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix) if err != nil { fmt.Println(err) os.Exit(1) @@ -99,134 +162,12 @@ func rebuild(cmd *cobra.Command, _ []string) { defer zk.Close() } - // In addition to the global topic regex, we have leader-evac topic regex as well. - var evacTopics []string - var err error - if let != "" { - evacTopics, err = zk.GetTopics(TopicRegex(let)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - } - - var evacBrokers []int - if leb != "" { - evacBrokers = brokerStringToSlice(leb) - } - - // General flow: - // 1) A PartitionMap is formed (either unmarshaled from the literal - // map input via --rebuild-map or generated from ZooKeeper Metadata - // for topics matching --topics). - // 2) A BrokerMap is formed from brokers found in the PartitionMap - // along with any new brokers provided via the --brokers param. - // 3) The PartitionMap and BrokerMap are fed to a rebuild - // function. Missing brokers, brokers marked for replacement, - // and all other placements are performed, returning a new - // PartitionMap. - // 4) Differences between the original and new PartitionMap - // are detected and reported. - // 5) The new PartitionMap is split by topic. Map(s) are written. - - // Fetch broker metadata. - var withMetrics bool - if cmd.Flag("placement").Value.String() == "storage" { - checkMetaAge(cmd, zk) - withMetrics = true - } - - var brokerMeta kafkazk.BrokerMetaMap - if m, _ := cmd.Flags().GetBool("use-meta"); m { - brokerMeta = getBrokerMeta(cmd, zk, withMetrics) - } - - // Fetch partition metadata. - var partitionMeta kafkazk.PartitionMetaMap - if cmd.Flag("placement").Value.String() == "storage" { - partitionMeta = getPartitionMeta(cmd, zk) - } - - // Build a partition map either from literal map text input or by fetching the - // map data from ZooKeeper. Store a copy of the original. - partitionMapIn, pending, excluded := getPartitionMap(cmd, zk) - originalMap := partitionMapIn.Copy() - - // Get a list of affected topics. - printTopics(partitionMapIn) - - // Print if any topics were excluded due to pending deletion or explicit - // exclusion. - printExcludedTopics(pending, excluded) - - brokers, bs := getBrokers(cmd, partitionMapIn, brokerMeta) - brokersOrig := brokers.Copy() - - if bs.Changes() { - fmt.Printf("%s-\n", indent) - } - - // Check if any referenced brokers are marked as having - // missing/partial metrics data. - if m, _ := cmd.Flags().GetBool("use-meta"); m { - ensureBrokerMetrics(cmd, brokers, brokerMeta) - } - - // Create substitution affinities. - affinities := getSubAffinities(cmd, brokers, brokersOrig, partitionMapIn) - - if affinities != nil { - fmt.Printf("%s-\n", indent) - } - - // Print changes, actions. - printChangesActions(cmd, bs) - - // Apply any replication factor settings. - updateReplicationFactor(cmd, partitionMapIn) - - // Build a new map using the provided list of brokers. This is OK to run even - // when a no-op is intended. - partitionMapOut, errs := buildMap(cmd, partitionMapIn, partitionMeta, brokers, affinities) - - // Optimize leaders. - if t, _ := cmd.Flags().GetBool("optimize-leadership"); t { - partitionMapOut.OptimizeLeaderFollower() - } - - // Count missing brokers as a warning. - if bs.Missing > 0 { - errs = append(errs, fmt.Errorf("%d provided brokers not found in ZooKeeper", bs.Missing)) - } - - // Count missing rack info as warning - if bs.RackMissing > 0 { - errs = append( - errs, fmt.Errorf("%d provided broker(s) do(es) not have a rack.id defined", bs.RackMissing), - ) - } - - // Generate phased map if enabled. - var phasedMap *kafkazk.PartitionMap - if phased, _ := cmd.Flags().GetBool("phased-reassignment"); phased { - phasedMap = phasedReassignment(originalMap, partitionMapOut) - } - - partitionMapOut = EvacLeadership(*partitionMapOut, evacBrokers, evacTopics) - - // Print map change results. - printMapChanges(originalMap, partitionMapOut) - - // Print broker assignment statistics. - printBrokerAssignmentStats(cmd, originalMap, partitionMapOut, brokersOrig, brokers) + maps, errs := runRebuild(params, zk) // Print error/warnings. handleOverridableErrs(cmd, errs) - // Skip no-ops if configured. - if sno, _ := cmd.Flags().GetBool("skip-no-ops"); sno { - originalMap, partitionMapOut = skipReassignmentNoOps(originalMap, partitionMapOut) - } - - writeMaps(cmd, partitionMapOut, phasedMap) + outPath := cmd.Flag("out-path").Value.String() + outFile := cmd.Flag("out-file").Value.String() + writeMaps(outPath, outFile, maps) } diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index c0ab672..dc37864 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -5,10 +5,154 @@ import ( "os" "github.com/DataDog/kafka-kit/v3/kafkazk" - - "github.com/spf13/cobra" ) +func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*kafkazk.PartitionMap, []error) { + // General flow: + // 1) A PartitionMap is formed (either unmarshaled from the literal + // map input via --rebuild-map or generated from ZooKeeper Metadata + // for topics matching --topics). + // 2) A BrokerMap is formed from brokers found in the PartitionMap + // along with any new brokers provided via the --brokers param. + // 3) The PartitionMap and BrokerMap are fed to a rebuild + // function. Missing brokers, brokers marked for replacement, + // and all other placements are performed, returning a new + // PartitionMap. + // 4) Differences between the original and new PartitionMap + // are detected and reported. + // 5) The new PartitionMap is split by topic. Map(s) are written. + + // In addition to the global topic regex, we have leader-evac topic regex as well. + var evacTopics []string + var err error + if len(params.leaderEvacTopics) != 0 { + evacTopics, err = zk.GetTopics(params.leaderEvacTopics) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + } + + // Fetch broker metadata. + var withMetrics bool + if params.placement == "storage" { + if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { + fmt.Println(err) + os.Exit(1) + } + withMetrics = true + } + + var brokerMeta kafkazk.BrokerMetaMap + var errs []error + if params.useMetadata { + if brokerMeta, errs = getBrokerMeta(zk, withMetrics); errs != nil && brokerMeta == nil { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + } + + // Fetch partition metadata. + var partitionMeta kafkazk.PartitionMetaMap + if params.placement == "storage" { + if partitionMeta, err = getPartitionMeta(zk); err != nil { + fmt.Println(err) + os.Exit(1) + } + } + + // Build a partition map either from literal map text input or by fetching the + // map data from ZooKeeper. Store a copy of the original. + partitionMapIn, pending, excluded := getPartitionMap(params, zk) + originalMap := partitionMapIn.Copy() + + // Get a list of affected topics. + printTopics(partitionMapIn) + + // Print if any topics were excluded due to pending deletion or explicit + // exclusion. + printExcludedTopics(pending, excluded) + + brokers, bs := getBrokers(params, partitionMapIn, brokerMeta) + brokersOrig := brokers.Copy() + + if bs.Changes() { + fmt.Printf("%s-\n", indent) + } + + // Check if any referenced brokers are marked as having + // missing/partial metrics data. + if params.useMetadata { + if errs := ensureBrokerMetrics(brokers, brokerMeta); len(errs) > 0 { + for _, e := range errs { + fmt.Println(e) + } + os.Exit(1) + } + } + + // Create substitution affinities. + affinities := getSubAffinities(params, brokers, brokersOrig, partitionMapIn) + + if affinities != nil { + fmt.Printf("%s-\n", indent) + } + + // Print changes, actions. + printChangesActions(params, bs) + + // Apply any replication factor settings. + updateReplicationFactor(params, partitionMapIn) + + // Build a new map using the provided list of brokers. This is OK to run even + // when a no-op is intended. + partitionMapOut, errs := buildMap(params, partitionMapIn, partitionMeta, brokers, affinities) + + // Optimize leaders. + if params.optimizeLeadership { + partitionMapOut.OptimizeLeaderFollower() + } + + // Count missing brokers as a warning. + if bs.Missing > 0 { + errs = append(errs, fmt.Errorf("%d provided brokers not found in ZooKeeper", bs.Missing)) + } + + // Count missing rack info as warning + if bs.RackMissing > 0 { + errs = append( + errs, fmt.Errorf("%d provided broker(s) do(es) not have a rack.id defined", bs.RackMissing), + ) + } + + outputMaps := []*kafkazk.PartitionMap{} + // Generate phased map if enabled. + if params.phasedReassignment { + outputMaps = append(outputMaps, phasedReassignment(originalMap, partitionMapOut)) + } + + partitionMapOut = evacuateLeadership(*partitionMapOut, params.leaderEvacBrokers, evacTopics) + + // Print map change results. + printMapChanges(originalMap, partitionMapOut) + + // Print broker assignment statistics. + errs = append( + errs, + printBrokerAssignmentStats(originalMap, partitionMapOut, brokersOrig, brokers, params.placement == "storage", params.partitionSizeFactor)..., + ) + + // Skip no-ops if configured. + if params.skipNoOps { + originalMap, partitionMapOut = skipReassignmentNoOps(originalMap, partitionMapOut) + } + outputMaps = append(outputMaps, partitionMapOut) + + return outputMaps, errs +} + // *References to metrics metadata persisted in ZooKeeper, see: // https://github.com/DataDog/kafka-kit/tree/master/cmd/metricsfetcher#data-structures) @@ -20,34 +164,36 @@ import ( // via the --topics flag. Two []string are returned; topics excluded due to // pending deletion and topics explicitly excluded (via the --topics-exclude // flag), respectively. -func getPartitionMap(cmd *cobra.Command, zk kafkazk.Handler) (*kafkazk.PartitionMap, []string, []string) { - ms := cmd.Flag("map-string").Value.String() +func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*kafkazk.PartitionMap, []string, []string) { switch { // The map was provided as text. - case ms != "": + case params.mapString != "": // Get a deserialized map. - pm, err := kafkazk.PartitionMapFromString(ms) + pm, err := kafkazk.PartitionMapFromString(params.mapString) if err != nil { fmt.Println(err) os.Exit(1) } // Exclude topics explicitly listed. - et := removeTopics(pm, Config.topicsExclude) + et := removeTopics(pm, params.topicsExclude) return pm, []string{}, et // The map needs to be fetched via ZooKeeper metadata for all specified topics. - case len(Config.topics) > 0: - pm, err := kafkazk.PartitionMapFromZK(Config.topics, zk) + case len(params.topics) > 0: + pm, err := kafkazk.PartitionMapFromZK(params.topics, zk) if err != nil { fmt.Println(err) os.Exit(1) } // Exclude any topics that are pending deletion. - pd := stripPendingDeletes(pm, zk) + pd, err := stripPendingDeletes(pm, zk) + if err != nil { + fmt.Println("Error fetching topics pending deletion") + } // Exclude topics explicitly listed. - et := removeTopics(pm, Config.topicsExclude) + et := removeTopics(pm, params.topicsExclude) return pm, pd, et } @@ -57,12 +203,10 @@ func getPartitionMap(cmd *cobra.Command, zk kafkazk.Handler) (*kafkazk.Partition // getSubAffinities, if enabled via --sub-affinity, takes reference broker maps // and a partition map and attempts to return a complete SubstitutionAffinities. -func getSubAffinities(cmd *cobra.Command, bm kafkazk.BrokerMap, bmo kafkazk.BrokerMap, pm *kafkazk.PartitionMap) kafkazk.SubstitutionAffinities { +func getSubAffinities(params rebuildParams, bm kafkazk.BrokerMap, bmo kafkazk.BrokerMap, pm *kafkazk.PartitionMap) kafkazk.SubstitutionAffinities { var affinities kafkazk.SubstitutionAffinities - sa, _ := cmd.Flags().GetBool("sub-affinity") - fr, _ := cmd.Flags().GetBool("force-rebuild") - if sa && !fr { + if params.subAffinity && !params.forceRebuild { var err error affinities, err = bm.SubstitutionAffinities(pm) if err != nil { @@ -94,16 +238,15 @@ func getSubAffinities(cmd *cobra.Command, bm kafkazk.BrokerMap, bmo kafkazk.Brok // brokers were discovered or newly provided (i.e. specified in the --brokers flag but // not previously holding any partitions for any partitions of the referenced topics // being rebuilt by topicmappr) -func getBrokers(cmd *cobra.Command, pm *kafkazk.PartitionMap, bm kafkazk.BrokerMetaMap) (kafkazk.BrokerMap, *kafkazk.BrokerStatus) { +func getBrokers(params rebuildParams, pm *kafkazk.PartitionMap, bm kafkazk.BrokerMetaMap) (kafkazk.BrokerMap, *kafkazk.BrokerStatus) { fmt.Printf("\nBroker change summary:\n") // Get a broker map of the brokers in the current partition map. // If meta data isn't being looked up, brokerMeta will be empty. - fr, _ := cmd.Flags().GetBool("force-rebuild") - brokers := kafkazk.BrokerMapFromPartitionMap(pm, bm, fr) + brokers := kafkazk.BrokerMapFromPartitionMap(pm, bm, params.forceRebuild) // Update the currentBrokers list with the provided broker list. - bs, msgs := brokers.Update(Config.brokers, bm) + bs, msgs := brokers.Update(params.brokers, bm) for m := range msgs { fmt.Printf("%s%s\n", indent, m) } @@ -113,11 +256,8 @@ func getBrokers(cmd *cobra.Command, pm *kafkazk.PartitionMap, bm kafkazk.BrokerM // printChangesActions takes a BrokerStatus and prints out information output // describing changes in broker counts and liveness. -func printChangesActions(cmd *cobra.Command, bs *kafkazk.BrokerStatus) { +func printChangesActions(params rebuildParams, bs *kafkazk.BrokerStatus) { change := bs.New - bs.Replace - r, _ := cmd.Flags().GetInt("replication") - fr, _ := cmd.Flags().GetBool("force-rebuild") - ol, _ := cmd.Flags().GetBool("optimize-leadership") // Print broker change summary. fmt.Printf("%sReplacing %d, added %d, missing %d, total count changed by %d\n", @@ -138,15 +278,15 @@ func printChangesActions(cmd *cobra.Command, bs *kafkazk.BrokerStatus) { actions <- fmt.Sprintf("Shrinking topic by %d broker(s)", -change) } - if fr { + if params.forceRebuild { actions <- fmt.Sprintf("Force rebuilding map") } - if r > 0 { - actions <- fmt.Sprintf("Setting replication factor to %d", r) + if params.replication > 0 { + actions <- fmt.Sprintf("Setting replication factor to %d", params.replication) } - if ol { + if params.optimizeLeadership { actions <- fmt.Sprintf("Optimizing leader/follower ratios") } @@ -167,33 +307,27 @@ func printChangesActions(cmd *cobra.Command, bs *kafkazk.BrokerStatus) { // updateReplicationFactor takes a PartitionMap and normalizes the replica set // length to an optionally provided value. -func updateReplicationFactor(cmd *cobra.Command, pm *kafkazk.PartitionMap) { - r, _ := cmd.Flags().GetInt("replication") +func updateReplicationFactor(params rebuildParams, pm *kafkazk.PartitionMap) { // If the replication factor is changed, the partition map input needs to have // stub brokers appended (r factor increase) or existing brokers removed // (r factor decrease). - if r > 0 { - pm.SetReplication(r) + if params.replication > 0 { + pm.SetReplication(params.replication) } } // buildMap takes an input PartitionMap, rebuild parameters, and all partition/broker // metadata structures required to generate the output PartitionMap. A []string of // warnings / advisories is returned if any are encountered. -func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.PartitionMetaMap, bm kafkazk.BrokerMap, af kafkazk.SubstitutionAffinities) (*kafkazk.PartitionMap, errors) { - placement := cmd.Flag("placement").Value.String() - psf, _ := cmd.Flags().GetFloat64("partition-size-factor") - mrrid, _ := cmd.Flags().GetInt("min-rack-ids") - +func buildMap(params rebuildParams, pm *kafkazk.PartitionMap, pmm kafkazk.PartitionMetaMap, bm kafkazk.BrokerMap, af kafkazk.SubstitutionAffinities) (*kafkazk.PartitionMap, errors) { rebuildParams := kafkazk.RebuildParams{ PMM: pmm, BM: bm, - Strategy: placement, - Optimization: cmd.Flag("optimize").Value.String(), - PartnSzFactor: psf, - MinUniqueRackIDs: mrrid, + Strategy: params.placement, + Optimization: params.optimize, + PartnSzFactor: params.partitionSizeFactor, + MinUniqueRackIDs: params.minRackIds, } - if af != nil { rebuildParams.Affinities = af } @@ -211,14 +345,13 @@ func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.Partitio // can be readded to the broker's StorageFree value. The amount to be readded, // the size of the partition, is referenced from the PartitionMetaMap. - if fr, _ := cmd.Flags().GetBool("force-rebuild"); fr { + if params.forceRebuild { // Get a stripped map that we'll call rebuild on. partitionMapInStripped := pm.Strip() // If the storage placement strategy is being used, // update the broker StorageFree values. - if placement == "storage" { - allBrokers := func(b *kafkazk.Broker) bool { return true } - err := rebuildParams.BM.SubStorage(pm, pmm, allBrokers) + if params.placement == "storage" { + err := rebuildParams.BM.SubStorage(pm, pmm, kafkazk.AllBrokersFn) if err != nil { fmt.Println(err) os.Exit(1) @@ -230,9 +363,8 @@ func buildMap(cmd *cobra.Command, pm *kafkazk.PartitionMap, pmm kafkazk.Partitio } // Update the StorageFree only on brokers marked for replacement. - if placement == "storage" { - replacedBrokers := func(b *kafkazk.Broker) bool { return b.Replace } - err := rebuildParams.BM.SubStorage(pm, pmm, replacedBrokers) + if params.placement == "storage" { + err := rebuildParams.BM.SubStorage(pm, pmm, kafkazk.ReplacedBrokersFn) if err != nil { fmt.Println(err) os.Exit(1) @@ -286,3 +418,57 @@ func notInReplicaSet(id int, rs []int) bool { return true } + +// evacuateLeadership For the given set of topics, makes sure that the given brokers are not +// leaders of any partitions. If we have any partitions that only have replicas from the +// evac broker list, we will fail. +func evacuateLeadership(partitionMapIn kafkazk.PartitionMap, evacBrokers []int, evacTopics []string) *kafkazk.PartitionMap { + // evacuation algo starts here + partitionMapOut := partitionMapIn.Copy() + + // make a lookup map of topics + topicsMap := map[string]struct{}{} + for _, topic := range evacTopics { + topicsMap[topic] = struct{}{} + } + + // make a lookup map of topics + brokersMap := map[int]struct{}{} + for _, b := range evacBrokers { + brokersMap[b] = struct{}{} + } + + // TODO What if we only want to evacuate a subset of partitions? + // For now, problem brokers is the bigger use case. + + // swap leadership for all broker/partition/topic combos + for i, p := range partitionMapIn.Partitions { + // check the topic is one of the target topics + if _, correctTopic := topicsMap[p.Topic]; !correctTopic { + continue + } + + // check the leader to see if its one of the evac brokers + if _, contains := brokersMap[p.Replicas[0]]; !contains { + continue + } + + for j, replica := range p.Replicas { + // If one of the replicas is not being leadership evacuated, use that one and swap. + if _, contains := brokersMap[replica]; !contains { + newLeader := p.Replicas[j] + partitionMapOut.Partitions[i].Replicas[j] = p.Replicas[0] + partitionMapOut.Partitions[i].Replicas[0] = newLeader + break + } + + // If we've tried every replica, but they are all being leader evac'd. + if replica == p.Replicas[len(p.Replicas)-1] { + fmt.Println("[ERROR] trying to evict all replicas at once") + os.Exit(1) + } + } + } + + return partitionMapOut +} diff --git a/cmd/topicmappr/commands/root.go b/cmd/topicmappr/commands/root.go index 6ddc5da..ed17c02 100644 --- a/cmd/topicmappr/commands/root.go +++ b/cmd/topicmappr/commands/root.go @@ -25,5 +25,6 @@ func Execute() { func init() { rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string") rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)") + rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") rootCmd.PersistentFlags().Bool("ignore-warns", false, "Produce a map even if warnings are encountered") } diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index befc87d..9ea1bb8 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -3,9 +3,6 @@ package commands import ( "fmt" "os" - "sort" - - "github.com/DataDog/kafka-kit/v3/kafkazk" "github.com/spf13/cobra" ) @@ -30,7 +27,6 @@ func init() { scaleCmd.Flags().Int("partition-size-threshold", 512, "Size in megabytes where partitions below this value will not be moved in a scale") scaleCmd.Flags().Bool("locality-scoped", false, "Ensure that all partition movements are scoped by rack.id") scaleCmd.Flags().Bool("verbose", false, "Verbose output") - scaleCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") scaleCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes)") scaleCmd.Flags().Bool("optimize-leadership", false, "Scale all broker leader/follower ratios") @@ -40,10 +36,15 @@ func init() { } func scale(cmd *cobra.Command, _ []string) { - bootstrap(cmd) + sanitizeInput(cmd) + params := reassignParamsFromCmd(cmd) + params.requireNewBrokers = true // ZooKeeper init. - zk, err := initZooKeeper(cmd) + zkAddr := cmd.Parent().Flag("zk-addr").Value.String() + kafkaPrefix := cmd.Parent().Flag("zk-prefix").Value.String() + metricsPrefix := cmd.Flag("zk-metrics-prefix").Value.String() + zk, err := initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix) if err != nil { fmt.Println(err) os.Exit(1) @@ -51,167 +52,13 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() - // Get broker and partition metadata. - checkMetaAge(cmd, zk) - brokerMeta := getBrokerMeta(cmd, zk, true) - partitionMeta := getPartitionMeta(cmd, zk) - - // Get the current partition map. - partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Exclude any topics that are pending deletion. - pending := stripPendingDeletes(partitionMapIn, zk) - - // Exclude any explicit exclusions. - excluded := removeTopics(partitionMapIn, Config.topicsExclude) - - // Print topics matched to input params. - printTopics(partitionMapIn) - - // Print if any topics were excluded due to pending deletion. - printExcludedTopics(pending, excluded) - - // Get a broker map. - brokersIn := kafkazk.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false) - - // Validate all broker params, get a copy of the - // broker IDs targeted for partition offloading. - offloadTargets := validateBrokersForScale(cmd, brokersIn, brokerMeta) - - // Sort offloadTargets by storage free ascending. - sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokersIn}) - - partitionLimit, _ := cmd.Flags().GetInt("partition-limit") - partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold") - tolerance, _ := cmd.Flags().GetFloat64("tolerance") - localityScoped, _ := cmd.Flags().GetBool("locality-scoped") - verbose, _ := cmd.Flags().GetBool("verbose") - - // Generate reassignmentBundles for a scale up. - params := computeReassignmentBundlesParams{ - offloadTargets: offloadTargets, - tolerance: tolerance, - partitionMap: partitionMapIn, - partitionMeta: partitionMeta, - brokerMap: brokersIn, - partitionLimit: partitionLimit, - partitionSizeThreshold: partitionSizeThreshold, - localityScoped: localityScoped, - verbose: verbose, - } - - results := computeReassignmentBundles(params) - - // Merge all results into a slice. - resultsByRange := []reassignmentBundle{} - for r := range results { - resultsByRange = append(resultsByRange, r) - } - - // Sort the scale results by range ascending. - sort.Slice(resultsByRange, func(i, j int) bool { - switch { - case resultsByRange[i].storageRange < resultsByRange[j].storageRange: - return true - case resultsByRange[i].storageRange > resultsByRange[j].storageRange: - return false - } - - return resultsByRange[i].stdDev < resultsByRange[j].stdDev - }) - - // Chose the results with the lowest range. - m := resultsByRange[0] - partitionMapOut, brokersOut, relos := m.partitionMap, m.brokers, m.relocations - - // Print parameters used for scale decisions. - printReassignmentParams(cmd, resultsByRange, brokersIn, m.tolerance) - - // Optimize leaders. - if t, _ := cmd.Flags().GetBool("optimize-leadership"); t { - partitionMapOut.OptimizeLeaderFollower() - } + partitionMaps, _ := reassign(params, zk) - // Print planned relocations. - printPlannedRelocations(offloadTargets, relos, partitionMeta) - - // Print map change results. - printMapChanges(partitionMapIn, partitionMapOut) - - // Print broker assignment statistics. - errs := printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut) - - // Handle errors that are possible - // to be overridden by the user (aka - // 'WARN' in topicmappr console output). - handleOverridableErrs(cmd, errs) - - // Ignore no-ops; scales will naturally have - // a high percentage of these. - partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut) - - // Write maps. - writeMaps(cmd, partitionMapOut, nil) -} - -func validateBrokersForScale(cmd *cobra.Command, brokers kafkazk.BrokerMap, bm kafkazk.BrokerMetaMap) []int { - // No broker changes are permitted in rebalance other than new broker additions. - fmt.Println("\nValidating broker list:") - - // Update the current BrokerList with - // the provided broker list. - c, msgs := brokers.Update(Config.brokers, bm) - for m := range msgs { - fmt.Printf("%s%s\n", indent, m) - } - - if c.Changes() { - fmt.Printf("%s-\n", indent) - } - - // Check if any referenced brokers are marked as having missing/partial metrics data. - ensureBrokerMetrics(cmd, brokers, bm) - - switch { - case c.Missing > 0, c.OldMissing > 0, c.Replace > 0: - fmt.Printf("%s[ERROR] scale only allows broker additions\n", indent) - os.Exit(1) - case c.New > 0: - fmt.Printf("%s%d additional brokers added\n", indent, c.New) - fmt.Printf("%s-\n", indent) - fmt.Printf("%sOK\n", indent) - default: - fmt.Printf("%s[ERROR] scale requires additional brokers\n", indent) - os.Exit(1) - } - - var offloadTargets []int - - // Offload targets are all non-new brokers. - f := func(b *kafkazk.Broker) bool { return !b.New } - - matches := brokers.Filter(f) - for _, b := range matches { - offloadTargets = append(offloadTargets, b.ID) - } - - sort.Ints(offloadTargets) - - fmt.Println("\nBrokers targeted for partition offloading:") - - // Exit if we've hit insufficient broker counts. - if len(offloadTargets) == 0 { - fmt.Printf("%s[none]\n", indent) - os.Exit(0) - } else { - for _, id := range offloadTargets { - fmt.Printf("%s%d\n", indent, id) - } - } + // TODO intentionally not handling the one error that can be returned here + // right now, but would be better to distinguish errors + // handleOverridableErrs(cmd, errs) - return offloadTargets + outPath := cmd.Flag("out-path").Value.String() + outFile := cmd.Flag("out-file").Value.String() + writeMaps(outPath, outFile, partitionMaps) } diff --git a/kafkazk/brokers.go b/kafkazk/brokers.go index e181273..2f3a671 100644 --- a/kafkazk/brokers.go +++ b/kafkazk/brokers.go @@ -160,6 +160,8 @@ type BrokerFilterFn func(*Broker) bool // AllBrokersFn returns all brokers. var AllBrokersFn BrokerFilterFn = func(b *Broker) bool { return true } +var NotReplacedBrokersFn = func(b *Broker) bool { return !b.Replace } +var ReplacedBrokersFn = func(b *Broker) bool { return b.Replace } // SortPseudoShuffle takes a BrokerList and performs a sort by count. // For each sequence of brokers with equal counts, the sub-slice is @@ -343,7 +345,7 @@ func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan str // SubStorageAll takes a PartitionMap, PartitionMetaMap, and a function. For all // brokers that return true as an input to function f, the size of all partitions // held is added back to the broker StorageFree value. -func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f func(*Broker) bool) error { +func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f BrokerFilterFn) error { // Get the size of each partition. for _, partn := range pm.Partitions { size, err := pmm.Size(partn) diff --git a/kafkazk/brokers_test.go b/kafkazk/brokers_test.go index 95110f2..1e96512 100644 --- a/kafkazk/brokers_test.go +++ b/kafkazk/brokers_test.go @@ -270,8 +270,7 @@ func TestSubStorageReplacements(t *testing.T) { bm[1003].Replace = true - replacedBrokers := func(b *Broker) bool { return b.Replace } - err := bm.SubStorage(pm, pmm, replacedBrokers) + err := bm.SubStorage(pm, pmm, ReplacedBrokersFn) if err != nil { t.Fatal(err) } diff --git a/kafkazk/constraints.go b/kafkazk/constraints.go index 4a1851c..e4bef8f 100644 --- a/kafkazk/constraints.go +++ b/kafkazk/constraints.go @@ -125,14 +125,7 @@ func (c *Constraints) Add(b *Broker) { func (c *Constraints) MergeConstraints(bl BrokerList) { // Don't merge in attributes // from nodes that will be removed. - var f BrokerFilterFn = func(b *Broker) bool { - if b.Replace { - return false - } - return true - } - - for _, b := range bl.Filter(f) { + for _, b := range bl.Filter(NotReplacedBrokersFn) { if b.Locality != "" { c.locality[b.Locality] = true } diff --git a/kafkazk/partitions.go b/kafkazk/partitions.go index 57d530c..fc52cde 100644 --- a/kafkazk/partitions.go +++ b/kafkazk/partitions.go @@ -267,6 +267,24 @@ func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error) { return newMap, errs } +// BrokersIn returns a BrokerFilterFn that filters for brokers in the +// PartitionMap +func (pm PartitionMap) BrokersIn() BrokerFilterFn { + mappedIDs := map[int]struct{}{} + for _, partn := range pm.Partitions { + for _, id := range partn.Replicas { + mappedIDs[id] = struct{}{} + } + } + + return func(b *Broker) bool { + if _, exist := mappedIDs[b.ID]; exist { + return true + } + return false + } +} + // placeByPosition builds a PartitionMap by doing placements for all // partitions, one broker index at a time. For instance, if all partitions // required a broker set length of 3 (aka a replication factor of 3), we'd @@ -279,14 +297,7 @@ func placeByPosition(params RebuildParams) (*PartitionMap, []error) { // We need a filtered list for usage sorting and exclusion // of nodes marked for removal. - f := func(b *Broker) bool { - if b.Replace { - return false - } - return true - } - - bl := params.BM.Filter(f).List() + bl := params.BM.Filter(NotReplacedBrokersFn).List() var errs []error var pass int @@ -421,14 +432,7 @@ func placeByPartition(params RebuildParams) (*PartitionMap, []error) { // We need a filtered list for usage sorting and exclusion // of nodes marked for removal. - f := func(b *Broker) bool { - if b.Replace { - return false - } - return true - } - - bl := params.BM.Filter(f).List() + bl := params.BM.Filter(NotReplacedBrokersFn).List() var errs []error diff --git a/kafkazk/stats.go b/kafkazk/stats.go index e89ea1f..59ce199 100644 --- a/kafkazk/stats.go +++ b/kafkazk/stats.go @@ -216,24 +216,26 @@ func (b BrokerMap) Mean() float64 { return t / c } +// AboveMeanFn returns a BrokerFilterFn that filters brokers that are above the +// mean by d percent (0.00 < d). The mean type is provided as a function f. +func AboveMeanFn(d float64, f func() float64) BrokerFilterFn { + m := f() + return func(b *Broker) bool { return (b.StorageFree-m)/m > d } +} + // AboveMean returns a sorted []int of broker IDs that are above the mean // by d percent (0.00 < d). The mean type is provided as a function f. func (b BrokerMap) AboveMean(d float64, f func() float64) []int { - m := f() var ids []int if d <= 0.00 { return ids } - for _, br := range b { - if br.ID == StubBrokerID { - continue - } + filtered := b.Filter(AboveMeanFn(d, f)) - if (br.StorageFree-m)/m > d { - ids = append(ids, br.ID) - } + for _, br := range filtered { + ids = append(ids, br.ID) } sort.Ints(ids) @@ -241,24 +243,26 @@ func (b BrokerMap) AboveMean(d float64, f func() float64) []int { return ids } +// BelowMeanFn returns a BrokerFilterFn that filters brokers that are below the +// mean by d percent (0.00 < d). The mean type is provided as a function f. +func BelowMeanFn(d float64, f func() float64) BrokerFilterFn { + m := f() + return func(b *Broker) bool { return (m-b.StorageFree)/m > d } +} + // BelowMean returns a sorted []int of broker IDs that are below the mean // by d percent (0.00 < d). The mean type is provided as a function f. func (b BrokerMap) BelowMean(d float64, f func() float64) []int { - m := f() var ids []int if d <= 0.00 { return ids } - for _, br := range b { - if br.ID == StubBrokerID { - continue - } + filtered := b.Filter(BelowMeanFn(d, f)) - if (m-br.StorageFree)/m > d { - ids = append(ids, br.ID) - } + for _, br := range filtered { + ids = append(ids, br.ID) } sort.Ints(ids)