diff --git a/pkg/cmd/roachtest/tests/rebalance_load.go b/pkg/cmd/roachtest/tests/rebalance_load.go index 49065005c95a..a82f65bec90e 100644 --- a/pkg/cmd/roachtest/tests/rebalance_load.go +++ b/pkg/cmd/roachtest/tests/rebalance_load.go @@ -40,7 +40,10 @@ const ( // // mean_tolerance = mean * meanCPUTolerance // [mean - mean_tolerance, mean + mean_tolerance]. - meanCPUTolerance = 0.1 + // + // The store rebalancer watches the replica CPU load and balances within + // +-10% of the mean. To reduce noise, add a buffer (5%) ontop. + meanCPUTolerance = 0.15 // statSamplePeriod is the period at which timeseries stats are sampled. statSamplePeriod = 10 * time.Second ) @@ -103,6 +106,16 @@ func registerRebalanceLoad(r registry.Registry) { c.Put(ctx, t.DeprecatedWorkload(), "./workload", appNode) c.Run(ctx, appNode, fmt.Sprintf("./workload init kv --drop --splits=%d {pgurl:1}", splits)) + db := c.Conn(ctx, t.L(), 1) + defer db.Close() + + require.NoError(t, WaitFor3XReplication(ctx, t, db)) + t.Status("disable load based splitting") + require.NoError(t, disableLoadBasedSplitting(ctx, db)) + t.Status(fmt.Sprintf("setting rebalance mode to %s", rebalanceMode)) + _, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode) + require.NoError(t, err) + var m *errgroup.Group // see comment in version.go m, ctx = errgroup.WithContext(ctx) @@ -113,7 +126,6 @@ func registerRebalanceLoad(r registry.Registry) { m.Go(func() error { t.L().Printf("starting load generator\n") - err := c.RunE(ctx, appNode, fmt.Sprintf( "./workload run kv --read-percent=95 --tolerate-errors --concurrency=%d "+ "--duration=%v {pgurl:1-%d}", @@ -130,20 +142,6 @@ func registerRebalanceLoad(r registry.Registry) { m.Go(func() error { t.Status("checking for CPU balance") - db := c.Conn(ctx, t.L(), 1) - defer db.Close() - - t.Status("disable load based splitting") - if err := disableLoadBasedSplitting(ctx, db); err != nil { - return err - } - - if _, err := db.ExecContext( - ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode, - ); err != nil { - return err - } - storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores) if err != nil { return err @@ -193,7 +191,7 @@ func registerRebalanceLoad(r registry.Registry) { concurrency = 32 fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) } - rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, false /* mixedVersion */) + rebalanceLoadRun(ctx, t, c, "leases", 5*time.Minute, concurrency, false /* mixedVersion */) }, }, ) @@ -207,7 +205,7 @@ func registerRebalanceLoad(r registry.Registry) { concurrency = 32 fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) } - rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, true /* mixedVersion */) + rebalanceLoadRun(ctx, t, c, "leases", 5*time.Minute, concurrency, true /* mixedVersion */) }, }, )