Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topicmappr refactor #387

Merged
merged 18 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 10 additions & 25 deletions cmd/topicmappr/commands/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"math"
"os"
"sort"
"strings"

"github.com/DataDog/kafka-kit/v3/kafkazk"

Expand Down Expand Up @@ -80,7 +79,7 @@ func printMapChanges(pm1, pm2 *kafkazk.PartitionMap) {
// printBrokerAssignmentStats prints before and after broker usage stats,
// such as leadership counts, total partitions owned, degree distribution,
// and changes in storage usage.
func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionMap, bm1, bm2 kafkazk.BrokerMap) errors {
func printBrokerAssignmentStats(pm1, pm2 *kafkazk.PartitionMap, bm1, bm2 kafkazk.BrokerMap, storageBased bool, partitionSizeFactor float64) errors {
var errs errors

fmt.Println("\nBroker distribution:")
Expand All @@ -100,17 +99,10 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM
}

// If we're using the storage placement strategy, write anticipated storage changes.
psf, _ := cmd.Flags().GetFloat64("partition-size-factor")

switch {
case
cmd.Name() == "scale",
cmd.Name() == "rebalance",
cmd.Flag("placement").Value.String() == "storage":

if storageBased {
fmt.Println("\nStorage free change estimations:")
if psf != 1.0 && cmd.Name() != "rebalance" {
fmt.Printf("%sPartition size factor of %.2f applied\n", indent, psf)
if partitionSizeFactor != 1.0 {
fmt.Printf("%sPartition size factor of %.2f applied\n", indent, partitionSizeFactor)
}

// Get filtered BrokerMaps. For the 'before' broker statistics, we want
Expand All @@ -130,9 +122,7 @@ func printBrokerAssignmentStats(cmd *cobra.Command, pm1, pm2 *kafkazk.PartitionM
fmt.Printf("%srange: %.2fGB -> %.2fGB\n", indent, r1/div, r2/div)
if r2 > r1 {
// Range increases are acceptable and common in scale up operations.
if cmd.Name() != "scale" {
errs = append(errs, fmt.Errorf("broker free storage range increased"))
}
errs = append(errs, fmt.Errorf("broker free storage range increased"))
}

// Range spread before/after.
Expand Down Expand Up @@ -265,15 +255,12 @@ func writeMaps(outPath, outFile string, pm *kafkazk.PartitionMap, phasedPM *kafk
}
}

func printReassignmentParams(cmd *cobra.Command, results []reassignmentBundle, brokers kafkazk.BrokerMap, tol float64) {
subCmd := cmd.Name()

fmt.Printf("\n%s parameters:\n", strings.Title(subCmd))
func printReassignmentParams(params reassignParams, results []reassignmentBundle, brokers kafkazk.BrokerMap, tol float64) {
fmt.Printf("\nReassignment parameters:\n")

pst, _ := cmd.Flags().GetInt("partition-size-threshold")
mean, hMean := brokers.Mean(), brokers.HMean()

fmt.Printf("%sIgnoring partitions smaller than %dMB\n", indent, pst)
fmt.Printf("%sIgnoring partitions smaller than %dMB\n", indent, params.partitionSizeThreshold)
fmt.Printf("%sFree storage mean, harmonic mean: %.2fGB, %.2fGB\n",
indent, mean/div, hMean/div)

Expand All @@ -283,11 +270,9 @@ func printReassignmentParams(cmd *cobra.Command, results []reassignmentBundle, b
fmt.Printf("%s%sSources limited to <= %.2fGB\n", indent, indent, mean*(1+tol)/div)
fmt.Printf("%s%sDestinations limited to >= %.2fGB\n", indent, indent, mean*(1-tol)/div)

verbose, _ := cmd.Flags().GetBool("verbose")

// Print the top 10 rebalance results in verbose.
if verbose {
fmt.Printf("%s-\nTop 10 %s map results\n", indent, subCmd)
if params.verbose {
fmt.Printf("%s-\nTop 10 reassignment map results\n", indent)
for i, r := range results {
fmt.Printf("%stolerance: %.2f -> range: %.2fGB, std. deviation: %.2fGB\n",
indent, r.tolerance, r.storageRange/div, r.stdDev/div)
Expand Down
Loading