From 8661eed504f7e22706c7351868ff9af9bda22ffb Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 5 May 2023 14:01:13 +0000 Subject: [PATCH] roachtest: assert on CPU in rebalance/by-load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `rebalance/by-load/replicas` roachtests periodically flake due to a known limitation, where a cluster with a small number of ranges may not be properly balanced due to heterogeneous localities (including multi-store) #88829. This commit updates the total number of ranges from 1 per-store, to 5 per-store for `rebalance/by-load/*` roachtests. The roachtest asserted on the lease count, as a proxy for load, assuming that the load evenly hits each lease for the created ranges. However, the principal indicator of load in a read heavy workload is CPU. This commit updates the test assertion to require that every store's CPU is within 10% of the cluster mean. The test assertion previously required that the max-min lease count delta was 0, when no outside splits occurred; or 1 when the number of ranges was greater than the number of stores. The logging format is updated for easier debugging: ``` cpu outside bounds mean=23.9 tolerance=10.0% (±2.4) bounds=[21.5, 26.3] below = [] within = [s1: 22 (-6.7%), s2: 22 (-6.3%)] above = [s3: 26 (+13.0%)] ... cpu within bounds mean=25.7 tolerance=10.0% (±2.6) bounds=[23.2, 28.3] stores=[s1: 24 (-3.0%), s2: 24 (-2.9%), s3: 27 (+5.9%)] ``` resolves: #102801 resolves: #102823 Release note: None --- pkg/cmd/roachtest/tests/rebalance_load.go | 234 +++++++++++++-------- pkg/cmd/roachtest/tests/rebalance_stats.go | 10 + 2 files changed, 159 insertions(+), 85 deletions(-) diff --git a/pkg/cmd/roachtest/tests/rebalance_load.go b/pkg/cmd/roachtest/tests/rebalance_load.go index 81485336de6e..49065005c95a 100644 --- a/pkg/cmd/roachtest/tests/rebalance_load.go +++ b/pkg/cmd/roachtest/tests/rebalance_load.go @@ -12,10 +12,8 @@ package tests import ( "context" - gosql "database/sql" "fmt" "math/rand" - "sort" "strings" "time" @@ -25,21 +23,36 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" - "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +const ( + // storeToRangeFactor is the number of ranges to create per store in the + // cluster. + storeToRangeFactor = 5 + // meanCPUTolerance is the tolerance applied when checking normalized (0-100) + // CPU percent utilization of stores against the mean. In multi-store tests, + // the same CPU utilization will be reported for stores on the same node. The + // acceptable range for CPU w.r.t the mean is: + // + // mean_tolerance = mean * meanCPUTolerance + // [mean - mean_tolerance, mean + mean_tolerance]. + meanCPUTolerance = 0.1 + // statSamplePeriod is the period at which timeseries stats are sampled. + statSamplePeriod = 10 * time.Second +) + func registerRebalanceLoad(r registry.Registry) { // This test creates a single table for kv to use and splits the table to - // have one range for every node in the cluster. Because even brand new - // clusters start with 20+ ranges in them, the number of new ranges in kv's - // table is small enough that it typically won't trigger rebalancing of - // leases in the cluster based on lease count alone. We let kv generate a lot - // of load against the ranges such that we'd expect load-based rebalancing to - // distribute the load evenly across the nodes in the cluster. + // have 5 ranges for every node in the cluster. Because even brand new + // clusters start with 40+ ranges in them, the number of new ranges in kv's + // table is small enough that it typically won't trigger significant + // rebalancing of leases in the cluster based on lease count alone. We let kv + // generate a lot of load against the ranges such that we'd expect load-based + // rebalancing to distribute the load evenly across the nodes in the cluster. rebalanceLoadRun := func( ctx context.Context, t test.Test, @@ -52,12 +65,18 @@ func registerRebalanceLoad(r registry.Registry) { startOpts := option.DefaultStartOpts() roachNodes := c.Range(1, c.Spec().NodeCount-1) appNode := c.Node(c.Spec().NodeCount) - numStores := len(roachNodes) + numNodes := len(roachNodes) + numStores := numNodes if c.Spec().SSDs > 1 && !c.Spec().RAID0 { numStores *= c.Spec().SSDs startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs } - splits := numStores - 1 // n-1 splits => n ranges => 1 lease per store + // We want each store to end up with approximately storeToRangeFactor + // (factor) leases such that the CPU load is evenly spread, e.g. + // (n * factor) -1 splits = factor * n ranges = factor leases per store + // Note that we only assert on the CPU of each store w.r.t the mean, not + // the lease count. + splits := (numStores * storeToRangeFactor) - 1 startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5") settings := install.MakeClusterSettings() @@ -87,8 +106,8 @@ func registerRebalanceLoad(r registry.Registry) { var m *errgroup.Group // see comment in version.go m, ctx = errgroup.WithContext(ctx) - // Enable us to exit out of workload early when we achieve the desired - // lease balance. This drastically shortens the duration of the test in the + // Enable us to exit out of workload early when we achieve the desired CPU + // balance. This drastically shortens the duration of the test in the // common case. ctx, cancel := context.WithCancel(ctx) @@ -100,7 +119,7 @@ func registerRebalanceLoad(r registry.Registry) { "--duration=%v {pgurl:1-%d}", concurrency, maxDuration, len(roachNodes))) if errors.Is(ctx.Err(), context.Canceled) { - // We got canceled either because lease balance was achieved or the + // We got canceled either because CPU balance was achieved or the // other worker hit an error. In either case, it's not this worker's // fault. return nil @@ -109,7 +128,7 @@ func registerRebalanceLoad(r registry.Registry) { }) m.Go(func() error { - t.Status("checking for lease balance") + t.Status("checking for CPU balance") db := c.Conn(ctx, t.L(), 1) defer db.Close() @@ -125,23 +144,37 @@ func registerRebalanceLoad(r registry.Registry) { return err } - for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { - if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil { - return err - } else if done { - t.Status("successfully achieved lease balance; waiting for kv to finish running") - cancel() - return nil - } + storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores) + if err != nil { + return err + } + var reason string + var done bool + for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { + // Wait out the sample period initially to allow the timeseries to + // populate meaningful information for the test to query. select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-time.After(statSamplePeriod): + } + + clusterStoresCPU, err := storeCPUFn(ctx) + if err != nil { + t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error()) + } + + done, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance) + t.L().Printf("cpu %s", reason) + if done { + t.Status("successfully achieved CPU balance; waiting for kv to finish running") + cancel() + return nil } } - return fmt.Errorf("timed out before leases were evenly spread") + return errors.Errorf("CPU not evenly balanced after timeout: %s", reason) }) if err := m.Wait(); err != nil { t.Fatal(err) @@ -233,79 +266,110 @@ func registerRebalanceLoad(r registry.Registry) { ) } -func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numStores int) (bool, error) { - rows, err := db.Query( - `select lease_holder, count(*) ` + - `from [show ranges from table kv.kv] ` + - `group by lease_holder;`) - if err != nil { - // TODO(rafi): Remove experimental_ranges query once we stop testing 19.1 or - // earlier. - if strings.Contains(err.Error(), "syntax error at or near \"ranges\"") { - rows, err = db.Query( - `select lease_holder, count(*) ` + - `from [show experimental_ranges from table kv.kv] ` + - `group by lease_holder;`) - } - } +// makeStoreCPUFn returns a function which can be called to gather the CPU of +// the cluster stores. When there are multiple stores per node, stores on the +// same node will report identical CPU. +func makeStoreCPUFn( + octx context.Context, c cluster.Cluster, t test.Test, numNodes, numStores int, +) (func(ctx context.Context) ([]float64, error), error) { + adminURLs, err := c.ExternalAdminUIAddr(octx, t.L(), c.Node(1)) if err != nil { - return false, err + return nil, err } - defer rows.Close() - leaseCounts := make(map[int]int) - var rangeCount int - for rows.Next() { - var storeID, leaseCount int - if err := rows.Scan(&storeID, &leaseCount); err != nil { - return false, err + url := adminURLs[0] + startTime := timeutil.Now() + tsQueries := make([]tsQuery, numNodes) + for i := range tsQueries { + tsQueries[i] = tsQuery{ + name: "cr.node.sys.cpu.combined.percent-normalized", + queryType: total, + sources: []string{fmt.Sprintf("%d", i+1)}, } - leaseCounts[storeID] = leaseCount - rangeCount += leaseCount } - if len(leaseCounts) < numStores { - l.Printf("not all stores have a lease yet: %v\n", formatLeaseCounts(leaseCounts)) - return false, nil - } + return func(ctx context.Context) ([]float64, error) { + now := timeutil.Now() + resp, err := getMetricsWithSamplePeriod( + url, startTime, now, statSamplePeriod, tsQueries) + if err != nil { + return nil, err + } - // The simple case is when ranges haven't split. We can require that every - // store has one lease. - if rangeCount == numStores { - for _, leaseCount := range leaseCounts { - if leaseCount != 1 { - l.Printf("uneven lease distribution: %s\n", formatLeaseCounts(leaseCounts)) - return false, nil + // Assume that stores on the same node will have sequential store IDs e.g. + // when the stores per node is 2: + // node 1 = store 1, store 2 ... node N = store 2N-1, store 2N + storesPerNode := numStores / numNodes + storeCPUs := make([]float64, numStores) + for node, result := range resp.Results { + // Take the latest CPU data point only. + cpu := result.Datapoints[len(result.Datapoints)-1].Value + nodeIdx := node * storesPerNode + for storeOffset := 0; storeOffset < storesPerNode; storeOffset++ { + // The values will be a normalized float in [0,1.0], scale to a + // percentage [0,100]. + storeCPUs[nodeIdx+storeOffset] = cpu * 100 } } - l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts)) - return true, nil - } + return storeCPUs, nil + }, nil +} - // For completeness, if leases have split, verify the leases per store don't - // differ by any more than 1. - leases := make([]int, 0, numStores) - for _, leaseCount := range leaseCounts { - leases = append(leases, leaseCount) +// isLoadEvenlyDistributed checks whether the load for the stores given are +// within tolerance of the mean. If the store loads are, true is returned as +// well as reason, otherwise false. The function expects the loads to be +// indexed to store IDs, see makeStoreCPUFn for example format. +func isLoadEvenlyDistributed(loads []float64, tolerance float64) (ok bool, reason string) { + mean := arithmeticMean(loads) + // If the mean is zero, there's nothing meaningful to assert on. Return early + // that the load isn't evenly distributed. + if mean == 0 { + return false, "no load: mean=0" } - sort.Ints(leases) - if leases[0]+1 < leases[len(leases)-1] { - l.Printf("leases per store differ by more than one: %s\n", formatLeaseCounts(leaseCounts)) - return false, nil + + meanTolerance := mean * tolerance + lb := mean - meanTolerance + ub := mean + meanTolerance + + // Partiton the loads into above, below and within the tolerance bounds of + // the load mean. + above, below, within := []int{}, []int{}, []int{} + for i, load := range loads { + storeID := i + 1 + if load > ub { + above = append(above, storeID) + } else if load < lb { + below = append(below, storeID) + } else { + within = append(within, storeID) + } } - l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts)) - return true, nil + boundsStr := fmt.Sprintf("mean=%.1f tolerance=%.1f%% (±%.1f) bounds=[%.1f, %.1f]", + mean, 100*tolerance, meanTolerance, lb, ub) + if len(below) > 0 || len(above) > 0 { + ok = false + reason = fmt.Sprintf( + "outside bounds %s\n\tbelow = %s\n\twithin = %s\n\tabove = %s\n", + boundsStr, + formatLoads(below, loads, mean), + formatLoads(within, loads, mean), + formatLoads(above, loads, mean), + ) + } else { + ok = true + reason = fmt.Sprintf("within bounds %s\n\tstores=%s\n", + boundsStr, formatLoads(within, loads, mean)) + } + return } -func formatLeaseCounts(counts map[int]int) string { - storeIDs := make([]int, 0, len(counts)) - for storeID := range counts { - storeIDs = append(storeIDs, storeID) - } - sort.Ints(storeIDs) - strs := make([]string, 0, len(counts)) - for _, storeID := range storeIDs { - strs = append(strs, fmt.Sprintf("s%d: %d", storeID, counts[storeID])) +func formatLoads(storeIDs []int, loads []float64, mean float64) string { + fmtLoads := make([]string, len(storeIDs)) + for i, storeID := range storeIDs { + load := loads[storeID-1] + fmtLoads[i] = fmt.Sprintf("s%d: %d (%+3.1f%%)", + storeID, int(load), (load-mean)/mean*100, + ) } - return fmt.Sprintf("[%s]", strings.Join(strs, ", ")) + return fmt.Sprintf("[%s]", strings.Join(fmtLoads, ", ")) } diff --git a/pkg/cmd/roachtest/tests/rebalance_stats.go b/pkg/cmd/roachtest/tests/rebalance_stats.go index aa3248e7c7a1..d95b141e34e5 100644 --- a/pkg/cmd/roachtest/tests/rebalance_stats.go +++ b/pkg/cmd/roachtest/tests/rebalance_stats.go @@ -99,6 +99,16 @@ func coefficientOfVariation(vals []float64) float64 { return stdev / mean } +// arithmeticMean is an average measure where avg = sum / count. This fn is +// included to return 0 instead of NaN like the stats pkg used underneath. +func arithmeticMean(vals []float64) float64 { + if len(vals) == 0 { + return 0 + } + mean, _ := stats.Mean(vals) + return mean +} + func scale(vals []float64, scale float64) []float64 { scaled := make([]float64, len(vals)) for i := range vals {