diff --git a/pkg/cmd/roachtest/tests/rebalance_load.go b/pkg/cmd/roachtest/tests/rebalance_load.go index 505f5cce0f66..0595f1252d1d 100644 --- a/pkg/cmd/roachtest/tests/rebalance_load.go +++ b/pkg/cmd/roachtest/tests/rebalance_load.go @@ -12,11 +12,9 @@ package tests import ( "context" - gosql "database/sql" "fmt" "math/rand" "runtime" - "sort" "strings" "time" @@ -27,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" - "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/version" "github.com/cockroachdb/errors" @@ -35,14 +32,30 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + // storeToRangeFactor is the number of ranges to create per store in the + // cluster. + storeToRangeFactor = 5 + // meanCPUTolerance is the tolerance applied when checking normalized (0-100) + // CPU percent utilization of stores against the mean. In multi-store tests, + // the same CPU utilization will be reported for stores on the same node. The + // acceptable range for CPU w.r.t the mean is: + // + // mean_tolerance = mean * meanCPUTolerance + // [mean - mean_tolerance, mean + mean_tolerance]. + meanCPUTolerance = 0.1 + // statSamplePeriod is the period at which timeseries stats are sampled. + statSamplePeriod = 10 * time.Second +) + func registerRebalanceLoad(r registry.Registry) { // This test creates a single table for kv to use and splits the table to - // have one range for every node in the cluster. Because even brand new - // clusters start with 20+ ranges in them, the number of new ranges in kv's - // table is small enough that it typically won't trigger rebalancing of - // leases in the cluster based on lease count alone. We let kv generate a lot - // of load against the ranges such that we'd expect load-based rebalancing to - // distribute the load evenly across the nodes in the cluster. + // have 5 ranges for every node in the cluster. Because even brand new + // clusters start with 40+ ranges in them, the number of new ranges in kv's + // table is small enough that it typically won't trigger significant + // rebalancing of leases in the cluster based on lease count alone. We let kv + // generate a lot of load against the ranges such that we'd expect load-based + // rebalancing to distribute the load evenly across the nodes in the cluster. rebalanceLoadRun := func( ctx context.Context, t test.Test, @@ -52,15 +65,24 @@ func registerRebalanceLoad(r registry.Registry) { concurrency int, mixedVersion bool, ) { - startOpts := option.DefaultStartOpts() + // This test asserts on the distribution of CPU utilization between nodes + // in the cluster, having backups also running could lead to unrelated + // flakes - disable backup schedule. + startOpts := option.DefaultStartOptsNoBackups() roachNodes := c.Range(1, c.Spec().NodeCount-1) appNode := c.Node(c.Spec().NodeCount) - numStores := len(roachNodes) + numNodes := len(roachNodes) + numStores := numNodes if c.Spec().SSDs > 1 && !c.Spec().RAID0 { numStores *= c.Spec().SSDs startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs } - splits := numStores - 1 // n-1 splits => n ranges => 1 lease per store + // We want each store to end up with approximately storeToRangeFactor + // (factor) leases such that the CPU load is evenly spread, e.g. + // (n * factor) -1 splits = factor * n ranges = factor leases per store + // Note that we only assert on the CPU of each store w.r.t the mean, not + // the lease count. + splits := (numStores * storeToRangeFactor) - 1 startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5") settings := install.MakeClusterSettings() @@ -87,8 +109,8 @@ func registerRebalanceLoad(r registry.Registry) { var m *errgroup.Group // see comment in version.go m, ctx = errgroup.WithContext(ctx) - // Enable us to exit out of workload early when we achieve the desired - // lease balance. This drastically shortens the duration of the test in the + // Enable us to exit out of workload early when we achieve the desired CPU + // balance. This drastically shortens the duration of the test in the // common case. ctx, cancel := context.WithCancel(ctx) @@ -100,7 +122,7 @@ func registerRebalanceLoad(r registry.Registry) { "--duration=%v {pgurl:1-%d}", concurrency, maxDuration, len(roachNodes))) if errors.Is(ctx.Err(), context.Canceled) { - // We got canceled either because lease balance was achieved or the + // We got canceled either because CPU balance was achieved or the // other worker hit an error. In either case, it's not this worker's // fault. return nil @@ -109,7 +131,7 @@ func registerRebalanceLoad(r registry.Registry) { }) m.Go(func() error { - t.Status("checking for lease balance") + t.Status("checking for CPU balance") db := c.Conn(ctx, t.L(), 1) defer db.Close() @@ -125,23 +147,37 @@ func registerRebalanceLoad(r registry.Registry) { return err } - for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { - if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil { - return err - } else if done { - t.Status("successfully achieved lease balance; waiting for kv to finish running") - cancel() - return nil - } + storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores) + if err != nil { + return err + } + var reason string + var done bool + for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { + // Wait out the sample period initially to allow the timeseries to + // populate meaningful information for the test to query. select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-time.After(statSamplePeriod): + } + + clusterStoresCPU, err := storeCPUFn(ctx) + if err != nil { + t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error()) + } + + done, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance) + t.L().Printf("cpu %s", reason) + if done { + t.Status("successfully achieved CPU balance; waiting for kv to finish running") + cancel() + return nil } } - return fmt.Errorf("timed out before leases were evenly spread") + return errors.Errorf("CPU not evenly balanced after timeout: %s", reason) }) if err := m.Wait(); err != nil { t.Fatal(err) @@ -236,79 +272,110 @@ func registerRebalanceLoad(r registry.Registry) { ) } -func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numStores int) (bool, error) { - rows, err := db.Query( - `SELECT lease_holder, count(*) ` + - `FROM [SHOW RANGES FROM TABLE kv.kv WITH DETAILS] ` + - `GROUP BY lease_holder;`) - if err != nil { - // TODO(rafi): Remove experimental_ranges query once we stop testing 19.1 or - // earlier. - if strings.Contains(err.Error(), "syntax error at or near \"ranges\"") { - rows, err = db.Query( - `SELECT lease_holder, count(*) ` + - `FROM [SHOW RANGES FROM TABLE kv.kv WITH DETAILS] ` + - `GROUP BY lease_holder;`) - } - } +// makeStoreCPUFn returns a function which can be called to gather the CPU of +// the cluster stores. When there are multiple stores per node, stores on the +// same node will report identical CPU. +func makeStoreCPUFn( + octx context.Context, c cluster.Cluster, t test.Test, numNodes, numStores int, +) (func(ctx context.Context) ([]float64, error), error) { + adminURLs, err := c.ExternalAdminUIAddr(octx, t.L(), c.Node(1)) if err != nil { - return false, err + return nil, err } - defer rows.Close() - leaseCounts := make(map[int]int) - var rangeCount int - for rows.Next() { - var storeID, leaseCount int - if err := rows.Scan(&storeID, &leaseCount); err != nil { - return false, err + url := adminURLs[0] + startTime := timeutil.Now() + tsQueries := make([]tsQuery, numNodes) + for i := range tsQueries { + tsQueries[i] = tsQuery{ + name: "cr.node.sys.cpu.combined.percent-normalized", + queryType: total, + sources: []string{fmt.Sprintf("%d", i+1)}, } - leaseCounts[storeID] = leaseCount - rangeCount += leaseCount } - if len(leaseCounts) < numStores { - l.Printf("not all stores have a lease yet: %v\n", formatLeaseCounts(leaseCounts)) - return false, nil - } + return func(ctx context.Context) ([]float64, error) { + now := timeutil.Now() + resp, err := getMetricsWithSamplePeriod( + url, startTime, now, statSamplePeriod, tsQueries) + if err != nil { + return nil, err + } - // The simple case is when ranges haven't split. We can require that every - // store has one lease. - if rangeCount == numStores { - for _, leaseCount := range leaseCounts { - if leaseCount != 1 { - l.Printf("uneven lease distribution: %s\n", formatLeaseCounts(leaseCounts)) - return false, nil + // Assume that stores on the same node will have sequential store IDs e.g. + // when the stores per node is 2: + // node 1 = store 1, store 2 ... node N = store 2N-1, store 2N + storesPerNode := numStores / numNodes + storeCPUs := make([]float64, numStores) + for node, result := range resp.Results { + // Take the latest CPU data point only. + cpu := result.Datapoints[len(result.Datapoints)-1].Value + nodeIdx := node * storesPerNode + for storeOffset := 0; storeOffset < storesPerNode; storeOffset++ { + // The values will be a normalized float in [0,1.0], scale to a + // percentage [0,100]. + storeCPUs[nodeIdx+storeOffset] = cpu * 100 } } - l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts)) - return true, nil - } + return storeCPUs, nil + }, nil +} - // For completeness, if leases have split, verify the leases per store don't - // differ by any more than 1. - leases := make([]int, 0, numStores) - for _, leaseCount := range leaseCounts { - leases = append(leases, leaseCount) +// isLoadEvenlyDistributed checks whether the load for the stores given are +// within tolerance of the mean. If the store loads are, true is returned as +// well as reason, otherwise false. The function expects the loads to be +// indexed to store IDs, see makeStoreCPUFn for example format. +func isLoadEvenlyDistributed(loads []float64, tolerance float64) (ok bool, reason string) { + mean := arithmeticMean(loads) + // If the mean is zero, there's nothing meaningful to assert on. Return early + // that the load isn't evenly distributed. + if mean == 0 { + return false, "no load: mean=0" } - sort.Ints(leases) - if leases[0]+1 < leases[len(leases)-1] { - l.Printf("leases per store differ by more than one: %s\n", formatLeaseCounts(leaseCounts)) - return false, nil + + meanTolerance := mean * tolerance + lb := mean - meanTolerance + ub := mean + meanTolerance + + // Partiton the loads into above, below and within the tolerance bounds of + // the load mean. + above, below, within := []int{}, []int{}, []int{} + for i, load := range loads { + storeID := i + 1 + if load > ub { + above = append(above, storeID) + } else if load < lb { + below = append(below, storeID) + } else { + within = append(within, storeID) + } } - l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts)) - return true, nil + boundsStr := fmt.Sprintf("mean=%.1f tolerance=%.1f%% (±%.1f) bounds=[%.1f, %.1f]", + mean, 100*tolerance, meanTolerance, lb, ub) + if len(below) > 0 || len(above) > 0 { + ok = false + reason = fmt.Sprintf( + "outside bounds %s\n\tbelow = %s\n\twithin = %s\n\tabove = %s\n", + boundsStr, + formatLoads(below, loads, mean), + formatLoads(within, loads, mean), + formatLoads(above, loads, mean), + ) + } else { + ok = true + reason = fmt.Sprintf("within bounds %s\n\tstores=%s\n", + boundsStr, formatLoads(within, loads, mean)) + } + return } -func formatLeaseCounts(counts map[int]int) string { - storeIDs := make([]int, 0, len(counts)) - for storeID := range counts { - storeIDs = append(storeIDs, storeID) - } - sort.Ints(storeIDs) - strs := make([]string, 0, len(counts)) - for _, storeID := range storeIDs { - strs = append(strs, fmt.Sprintf("s%d: %d", storeID, counts[storeID])) +func formatLoads(storeIDs []int, loads []float64, mean float64) string { + fmtLoads := make([]string, len(storeIDs)) + for i, storeID := range storeIDs { + load := loads[storeID-1] + fmtLoads[i] = fmt.Sprintf("s%d: %d (%+3.1f%%)", + storeID, int(load), (load-mean)/mean*100, + ) } - return fmt.Sprintf("[%s]", strings.Join(strs, ", ")) + return fmt.Sprintf("[%s]", strings.Join(fmtLoads, ", ")) } diff --git a/pkg/cmd/roachtest/tests/ts_util.go b/pkg/cmd/roachtest/tests/ts_util.go index db3c5957cfc0..9031315c6f26 100644 --- a/pkg/cmd/roachtest/tests/ts_util.go +++ b/pkg/cmd/roachtest/tests/ts_util.go @@ -38,9 +38,13 @@ const ( rate ) +// defaultSamplePeriod is the default sampling period for getMetrics. +const defaultSamplePeriod = time.Minute + type tsQuery struct { name string queryType tsQueryType + sources []string } func mustGetMetrics( @@ -55,6 +59,12 @@ func mustGetMetrics( func getMetrics( adminURL string, start, end time.Time, tsQueries []tsQuery, +) (tspb.TimeSeriesQueryResponse, error) { + return getMetricsWithSamplePeriod(adminURL, start, end, defaultSamplePeriod, tsQueries) +} + +func getMetricsWithSamplePeriod( + adminURL string, start, end time.Time, samplePeriod time.Duration, tsQueries []tsQuery, ) (tspb.TimeSeriesQueryResponse, error) { url := "http://" + adminURL + "/ts/query" queries := make([]tspb.Query, len(tsQueries)) @@ -65,6 +75,7 @@ func getMetrics( Name: tsQueries[i].name, Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), + Sources: tsQueries[i].sources, } case rate: queries[i] = tspb.Query{ @@ -72,6 +83,7 @@ func getMetrics( Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(), + Sources: tsQueries[i].sources, } default: panic("unexpected") @@ -83,7 +95,7 @@ func getMetrics( // Ask for one minute intervals. We can't just ask for the whole hour // because the time series query system does not support downsampling // offsets. - SampleNanos: (1 * time.Minute).Nanoseconds(), + SampleNanos: samplePeriod.Nanoseconds(), Queries: queries, } var response tspb.TimeSeriesQueryResponse