From 20e23328def49a2776a3ccbe8de6c2195f5999ff Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 5 May 2023 21:09:02 +0000 Subject: [PATCH 1/3] roachtest: add sample period and source to ts_util Previously, the utility functions for retrieving stats from the timeseries database in roachtest wouldn't allow specifying a sample period nor sources. This commit enhances ts_util.go to enable specifying a sample period via `getMetricsWithSamplePeriod` and updates the `tsQuery` to have a source field - which is applied on queries. Release note: None --- pkg/cmd/roachtest/tests/ts_util.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/tests/ts_util.go b/pkg/cmd/roachtest/tests/ts_util.go index db3c5957cfc0..9031315c6f26 100644 --- a/pkg/cmd/roachtest/tests/ts_util.go +++ b/pkg/cmd/roachtest/tests/ts_util.go @@ -38,9 +38,13 @@ const ( rate ) +// defaultSamplePeriod is the default sampling period for getMetrics. +const defaultSamplePeriod = time.Minute + type tsQuery struct { name string queryType tsQueryType + sources []string } func mustGetMetrics( @@ -55,6 +59,12 @@ func mustGetMetrics( func getMetrics( adminURL string, start, end time.Time, tsQueries []tsQuery, +) (tspb.TimeSeriesQueryResponse, error) { + return getMetricsWithSamplePeriod(adminURL, start, end, defaultSamplePeriod, tsQueries) +} + +func getMetricsWithSamplePeriod( + adminURL string, start, end time.Time, samplePeriod time.Duration, tsQueries []tsQuery, ) (tspb.TimeSeriesQueryResponse, error) { url := "http://" + adminURL + "/ts/query" queries := make([]tspb.Query, len(tsQueries)) @@ -65,6 +75,7 @@ func getMetrics( Name: tsQueries[i].name, Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), + Sources: tsQueries[i].sources, } case rate: queries[i] = tspb.Query{ @@ -72,6 +83,7 @@ func getMetrics( Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(), + Sources: tsQueries[i].sources, } default: panic("unexpected") @@ -83,7 +95,7 @@ func getMetrics( // Ask for one minute intervals. We can't just ask for the whole hour // because the time series query system does not support downsampling // offsets. - SampleNanos: (1 * time.Minute).Nanoseconds(), + SampleNanos: samplePeriod.Nanoseconds(), Queries: queries, } var response tspb.TimeSeriesQueryResponse From 8661eed504f7e22706c7351868ff9af9bda22ffb Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 5 May 2023 14:01:13 +0000 Subject: [PATCH 2/3] roachtest: assert on CPU in rebalance/by-load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `rebalance/by-load/replicas` roachtests periodically flake due to a known limitation, where a cluster with a small number of ranges may not be properly balanced due to heterogeneous localities (including multi-store) #88829. This commit updates the total number of ranges from 1 per-store, to 5 per-store for `rebalance/by-load/*` roachtests. The roachtest asserted on the lease count, as a proxy for load, assuming that the load evenly hits each lease for the created ranges. However, the principal indicator of load in a read heavy workload is CPU. This commit updates the test assertion to require that every store's CPU is within 10% of the cluster mean. The test assertion previously required that the max-min lease count delta was 0, when no outside splits occurred; or 1 when the number of ranges was greater than the number of stores. The logging format is updated for easier debugging: ``` cpu outside bounds mean=23.9 tolerance=10.0% (±2.4) bounds=[21.5, 26.3] below = [] within = [s1: 22 (-6.7%), s2: 22 (-6.3%)] above = [s3: 26 (+13.0%)] ... cpu within bounds mean=25.7 tolerance=10.0% (±2.6) bounds=[23.2, 28.3] stores=[s1: 24 (-3.0%), s2: 24 (-2.9%), s3: 27 (+5.9%)] ``` resolves: #102801 resolves: #102823 Release note: None --- pkg/cmd/roachtest/tests/rebalance_load.go | 234 +++++++++++++-------- pkg/cmd/roachtest/tests/rebalance_stats.go | 10 + 2 files changed, 159 insertions(+), 85 deletions(-) diff --git a/pkg/cmd/roachtest/tests/rebalance_load.go b/pkg/cmd/roachtest/tests/rebalance_load.go index 81485336de6e..49065005c95a 100644 --- a/pkg/cmd/roachtest/tests/rebalance_load.go +++ b/pkg/cmd/roachtest/tests/rebalance_load.go @@ -12,10 +12,8 @@ package tests import ( "context" - gosql "database/sql" "fmt" "math/rand" - "sort" "strings" "time" @@ -25,21 +23,36 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" - "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +const ( + // storeToRangeFactor is the number of ranges to create per store in the + // cluster. + storeToRangeFactor = 5 + // meanCPUTolerance is the tolerance applied when checking normalized (0-100) + // CPU percent utilization of stores against the mean. In multi-store tests, + // the same CPU utilization will be reported for stores on the same node. The + // acceptable range for CPU w.r.t the mean is: + // + // mean_tolerance = mean * meanCPUTolerance + // [mean - mean_tolerance, mean + mean_tolerance]. + meanCPUTolerance = 0.1 + // statSamplePeriod is the period at which timeseries stats are sampled. + statSamplePeriod = 10 * time.Second +) + func registerRebalanceLoad(r registry.Registry) { // This test creates a single table for kv to use and splits the table to - // have one range for every node in the cluster. Because even brand new - // clusters start with 20+ ranges in them, the number of new ranges in kv's - // table is small enough that it typically won't trigger rebalancing of - // leases in the cluster based on lease count alone. We let kv generate a lot - // of load against the ranges such that we'd expect load-based rebalancing to - // distribute the load evenly across the nodes in the cluster. + // have 5 ranges for every node in the cluster. Because even brand new + // clusters start with 40+ ranges in them, the number of new ranges in kv's + // table is small enough that it typically won't trigger significant + // rebalancing of leases in the cluster based on lease count alone. We let kv + // generate a lot of load against the ranges such that we'd expect load-based + // rebalancing to distribute the load evenly across the nodes in the cluster. rebalanceLoadRun := func( ctx context.Context, t test.Test, @@ -52,12 +65,18 @@ func registerRebalanceLoad(r registry.Registry) { startOpts := option.DefaultStartOpts() roachNodes := c.Range(1, c.Spec().NodeCount-1) appNode := c.Node(c.Spec().NodeCount) - numStores := len(roachNodes) + numNodes := len(roachNodes) + numStores := numNodes if c.Spec().SSDs > 1 && !c.Spec().RAID0 { numStores *= c.Spec().SSDs startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs } - splits := numStores - 1 // n-1 splits => n ranges => 1 lease per store + // We want each store to end up with approximately storeToRangeFactor + // (factor) leases such that the CPU load is evenly spread, e.g. + // (n * factor) -1 splits = factor * n ranges = factor leases per store + // Note that we only assert on the CPU of each store w.r.t the mean, not + // the lease count. + splits := (numStores * storeToRangeFactor) - 1 startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5") settings := install.MakeClusterSettings() @@ -87,8 +106,8 @@ func registerRebalanceLoad(r registry.Registry) { var m *errgroup.Group // see comment in version.go m, ctx = errgroup.WithContext(ctx) - // Enable us to exit out of workload early when we achieve the desired - // lease balance. This drastically shortens the duration of the test in the + // Enable us to exit out of workload early when we achieve the desired CPU + // balance. This drastically shortens the duration of the test in the // common case. ctx, cancel := context.WithCancel(ctx) @@ -100,7 +119,7 @@ func registerRebalanceLoad(r registry.Registry) { "--duration=%v {pgurl:1-%d}", concurrency, maxDuration, len(roachNodes))) if errors.Is(ctx.Err(), context.Canceled) { - // We got canceled either because lease balance was achieved or the + // We got canceled either because CPU balance was achieved or the // other worker hit an error. In either case, it's not this worker's // fault. return nil @@ -109,7 +128,7 @@ func registerRebalanceLoad(r registry.Registry) { }) m.Go(func() error { - t.Status("checking for lease balance") + t.Status("checking for CPU balance") db := c.Conn(ctx, t.L(), 1) defer db.Close() @@ -125,23 +144,37 @@ func registerRebalanceLoad(r registry.Registry) { return err } - for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { - if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil { - return err - } else if done { - t.Status("successfully achieved lease balance; waiting for kv to finish running") - cancel() - return nil - } + storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores) + if err != nil { + return err + } + var reason string + var done bool + for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { + // Wait out the sample period initially to allow the timeseries to + // populate meaningful information for the test to query. select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-time.After(statSamplePeriod): + } + + clusterStoresCPU, err := storeCPUFn(ctx) + if err != nil { + t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error()) + } + + done, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance) + t.L().Printf("cpu %s", reason) + if done { + t.Status("successfully achieved CPU balance; waiting for kv to finish running") + cancel() + return nil } } - return fmt.Errorf("timed out before leases were evenly spread") + return errors.Errorf("CPU not evenly balanced after timeout: %s", reason) }) if err := m.Wait(); err != nil { t.Fatal(err) @@ -233,79 +266,110 @@ func registerRebalanceLoad(r registry.Registry) { ) } -func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numStores int) (bool, error) { - rows, err := db.Query( - `select lease_holder, count(*) ` + - `from [show ranges from table kv.kv] ` + - `group by lease_holder;`) - if err != nil { - // TODO(rafi): Remove experimental_ranges query once we stop testing 19.1 or - // earlier. - if strings.Contains(err.Error(), "syntax error at or near \"ranges\"") { - rows, err = db.Query( - `select lease_holder, count(*) ` + - `from [show experimental_ranges from table kv.kv] ` + - `group by lease_holder;`) - } - } +// makeStoreCPUFn returns a function which can be called to gather the CPU of +// the cluster stores. When there are multiple stores per node, stores on the +// same node will report identical CPU. +func makeStoreCPUFn( + octx context.Context, c cluster.Cluster, t test.Test, numNodes, numStores int, +) (func(ctx context.Context) ([]float64, error), error) { + adminURLs, err := c.ExternalAdminUIAddr(octx, t.L(), c.Node(1)) if err != nil { - return false, err + return nil, err } - defer rows.Close() - leaseCounts := make(map[int]int) - var rangeCount int - for rows.Next() { - var storeID, leaseCount int - if err := rows.Scan(&storeID, &leaseCount); err != nil { - return false, err + url := adminURLs[0] + startTime := timeutil.Now() + tsQueries := make([]tsQuery, numNodes) + for i := range tsQueries { + tsQueries[i] = tsQuery{ + name: "cr.node.sys.cpu.combined.percent-normalized", + queryType: total, + sources: []string{fmt.Sprintf("%d", i+1)}, } - leaseCounts[storeID] = leaseCount - rangeCount += leaseCount } - if len(leaseCounts) < numStores { - l.Printf("not all stores have a lease yet: %v\n", formatLeaseCounts(leaseCounts)) - return false, nil - } + return func(ctx context.Context) ([]float64, error) { + now := timeutil.Now() + resp, err := getMetricsWithSamplePeriod( + url, startTime, now, statSamplePeriod, tsQueries) + if err != nil { + return nil, err + } - // The simple case is when ranges haven't split. We can require that every - // store has one lease. - if rangeCount == numStores { - for _, leaseCount := range leaseCounts { - if leaseCount != 1 { - l.Printf("uneven lease distribution: %s\n", formatLeaseCounts(leaseCounts)) - return false, nil + // Assume that stores on the same node will have sequential store IDs e.g. + // when the stores per node is 2: + // node 1 = store 1, store 2 ... node N = store 2N-1, store 2N + storesPerNode := numStores / numNodes + storeCPUs := make([]float64, numStores) + for node, result := range resp.Results { + // Take the latest CPU data point only. + cpu := result.Datapoints[len(result.Datapoints)-1].Value + nodeIdx := node * storesPerNode + for storeOffset := 0; storeOffset < storesPerNode; storeOffset++ { + // The values will be a normalized float in [0,1.0], scale to a + // percentage [0,100]. + storeCPUs[nodeIdx+storeOffset] = cpu * 100 } } - l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts)) - return true, nil - } + return storeCPUs, nil + }, nil +} - // For completeness, if leases have split, verify the leases per store don't - // differ by any more than 1. - leases := make([]int, 0, numStores) - for _, leaseCount := range leaseCounts { - leases = append(leases, leaseCount) +// isLoadEvenlyDistributed checks whether the load for the stores given are +// within tolerance of the mean. If the store loads are, true is returned as +// well as reason, otherwise false. The function expects the loads to be +// indexed to store IDs, see makeStoreCPUFn for example format. +func isLoadEvenlyDistributed(loads []float64, tolerance float64) (ok bool, reason string) { + mean := arithmeticMean(loads) + // If the mean is zero, there's nothing meaningful to assert on. Return early + // that the load isn't evenly distributed. + if mean == 0 { + return false, "no load: mean=0" } - sort.Ints(leases) - if leases[0]+1 < leases[len(leases)-1] { - l.Printf("leases per store differ by more than one: %s\n", formatLeaseCounts(leaseCounts)) - return false, nil + + meanTolerance := mean * tolerance + lb := mean - meanTolerance + ub := mean + meanTolerance + + // Partiton the loads into above, below and within the tolerance bounds of + // the load mean. + above, below, within := []int{}, []int{}, []int{} + for i, load := range loads { + storeID := i + 1 + if load > ub { + above = append(above, storeID) + } else if load < lb { + below = append(below, storeID) + } else { + within = append(within, storeID) + } } - l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts)) - return true, nil + boundsStr := fmt.Sprintf("mean=%.1f tolerance=%.1f%% (±%.1f) bounds=[%.1f, %.1f]", + mean, 100*tolerance, meanTolerance, lb, ub) + if len(below) > 0 || len(above) > 0 { + ok = false + reason = fmt.Sprintf( + "outside bounds %s\n\tbelow = %s\n\twithin = %s\n\tabove = %s\n", + boundsStr, + formatLoads(below, loads, mean), + formatLoads(within, loads, mean), + formatLoads(above, loads, mean), + ) + } else { + ok = true + reason = fmt.Sprintf("within bounds %s\n\tstores=%s\n", + boundsStr, formatLoads(within, loads, mean)) + } + return } -func formatLeaseCounts(counts map[int]int) string { - storeIDs := make([]int, 0, len(counts)) - for storeID := range counts { - storeIDs = append(storeIDs, storeID) - } - sort.Ints(storeIDs) - strs := make([]string, 0, len(counts)) - for _, storeID := range storeIDs { - strs = append(strs, fmt.Sprintf("s%d: %d", storeID, counts[storeID])) +func formatLoads(storeIDs []int, loads []float64, mean float64) string { + fmtLoads := make([]string, len(storeIDs)) + for i, storeID := range storeIDs { + load := loads[storeID-1] + fmtLoads[i] = fmt.Sprintf("s%d: %d (%+3.1f%%)", + storeID, int(load), (load-mean)/mean*100, + ) } - return fmt.Sprintf("[%s]", strings.Join(strs, ", ")) + return fmt.Sprintf("[%s]", strings.Join(fmtLoads, ", ")) } diff --git a/pkg/cmd/roachtest/tests/rebalance_stats.go b/pkg/cmd/roachtest/tests/rebalance_stats.go index aa3248e7c7a1..d95b141e34e5 100644 --- a/pkg/cmd/roachtest/tests/rebalance_stats.go +++ b/pkg/cmd/roachtest/tests/rebalance_stats.go @@ -99,6 +99,16 @@ func coefficientOfVariation(vals []float64) float64 { return stdev / mean } +// arithmeticMean is an average measure where avg = sum / count. This fn is +// included to return 0 instead of NaN like the stats pkg used underneath. +func arithmeticMean(vals []float64) float64 { + if len(vals) == 0 { + return 0 + } + mean, _ := stats.Mean(vals) + return mean +} + func scale(vals []float64, scale float64) []float64 { scaled := make([]float64, len(vals)) for i := range vals { From d7b46c1c4f8d856ef92e4a302f7a25d21c46093b Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 14 Jun 2023 14:31:22 +0000 Subject: [PATCH 3/3] roachtest: reduce rebalance-by-load noise The rebalance-by-load tests assert that the normalized [0,1] CPU utilization of each node is within some threshold of the mean. The threshold was previously 10%, however it is not unexpected that despite replica load being within this threshold, that total node load is not. The current balancing implementation only concerns itself with replica load. Bump the tolerance from 10% to 15% to reduce noise. Additionally, the test did not wait for 3x replication prior to beginning the workload. This is bound to introduce flakes eventually. Wait for 3x replication before beginning. Resolves: #104854 Resolves: #104386 Release note: None --- pkg/cmd/roachtest/tests/rebalance_load.go | 34 +++++++++++------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/pkg/cmd/roachtest/tests/rebalance_load.go b/pkg/cmd/roachtest/tests/rebalance_load.go index 49065005c95a..a82f65bec90e 100644 --- a/pkg/cmd/roachtest/tests/rebalance_load.go +++ b/pkg/cmd/roachtest/tests/rebalance_load.go @@ -40,7 +40,10 @@ const ( // // mean_tolerance = mean * meanCPUTolerance // [mean - mean_tolerance, mean + mean_tolerance]. - meanCPUTolerance = 0.1 + // + // The store rebalancer watches the replica CPU load and balances within + // +-10% of the mean. To reduce noise, add a buffer (5%) ontop. + meanCPUTolerance = 0.15 // statSamplePeriod is the period at which timeseries stats are sampled. statSamplePeriod = 10 * time.Second ) @@ -103,6 +106,16 @@ func registerRebalanceLoad(r registry.Registry) { c.Put(ctx, t.DeprecatedWorkload(), "./workload", appNode) c.Run(ctx, appNode, fmt.Sprintf("./workload init kv --drop --splits=%d {pgurl:1}", splits)) + db := c.Conn(ctx, t.L(), 1) + defer db.Close() + + require.NoError(t, WaitFor3XReplication(ctx, t, db)) + t.Status("disable load based splitting") + require.NoError(t, disableLoadBasedSplitting(ctx, db)) + t.Status(fmt.Sprintf("setting rebalance mode to %s", rebalanceMode)) + _, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode) + require.NoError(t, err) + var m *errgroup.Group // see comment in version.go m, ctx = errgroup.WithContext(ctx) @@ -113,7 +126,6 @@ func registerRebalanceLoad(r registry.Registry) { m.Go(func() error { t.L().Printf("starting load generator\n") - err := c.RunE(ctx, appNode, fmt.Sprintf( "./workload run kv --read-percent=95 --tolerate-errors --concurrency=%d "+ "--duration=%v {pgurl:1-%d}", @@ -130,20 +142,6 @@ func registerRebalanceLoad(r registry.Registry) { m.Go(func() error { t.Status("checking for CPU balance") - db := c.Conn(ctx, t.L(), 1) - defer db.Close() - - t.Status("disable load based splitting") - if err := disableLoadBasedSplitting(ctx, db); err != nil { - return err - } - - if _, err := db.ExecContext( - ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode, - ); err != nil { - return err - } - storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores) if err != nil { return err @@ -193,7 +191,7 @@ func registerRebalanceLoad(r registry.Registry) { concurrency = 32 fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) } - rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, false /* mixedVersion */) + rebalanceLoadRun(ctx, t, c, "leases", 5*time.Minute, concurrency, false /* mixedVersion */) }, }, ) @@ -207,7 +205,7 @@ func registerRebalanceLoad(r registry.Registry) { concurrency = 32 fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) } - rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, true /* mixedVersion */) + rebalanceLoadRun(ctx, t, c, "leases", 5*time.Minute, concurrency, true /* mixedVersion */) }, }, )