diff --git a/pkg/cmd/roachtest/tests/rebalance_load.go b/pkg/cmd/roachtest/tests/rebalance_load.go index 3e891ab07783..b600ad36ada2 100644 --- a/pkg/cmd/roachtest/tests/rebalance_load.go +++ b/pkg/cmd/roachtest/tests/rebalance_load.go @@ -20,11 +20,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" - "github.com/cockroachdb/cockroach/pkg/testutils/release" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -82,134 +82,56 @@ func registerRebalanceLoad(r registry.Registry) { appNode := c.Node(c.Spec().NodeCount) numNodes := len(roachNodes) numStores := numNodes + startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, + "--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5") if c.Spec().SSDs > 1 && !c.Spec().RAID0 { numStores *= c.Spec().SSDs startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs } - // We want each store to end up with approximately storeToRangeFactor - // (factor) leases such that the CPU load is evenly spread, e.g. - // (n * factor) -1 splits = factor * n ranges = factor leases per store - // Note that we only assert on the CPU of each store w.r.t the mean, not - // the lease count. - splits := (numStores * storeToRangeFactor) - 1 - startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, - "--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5") + settings := install.MakeClusterSettings() + settings.ClusterSettings["kv.allocator.load_based_rebalancing"] = rebalanceMode + settings.ClusterSettings["kv.range_split.by_load_enabled"] = "false" + if mixedVersion { - predecessorVersionStr, err := release.LatestPredecessor(t.BuildVersion()) - require.NoError(t, err) - predecessorVersion := clusterupgrade.MustParseVersion(predecessorVersionStr) - settings.Binary = uploadCockroach(ctx, t, c, c.All(), predecessorVersion) - // Upgrade some (or all) of the first N-1 CRDB nodes. We ignore the last - // CRDB node (to leave at least one node on the older version), and the - // app node. - lastNodeToUpgrade := rand.Intn(c.Spec().NodeCount-2) + 1 - t.L().Printf("upgrading %d nodes to the current cockroach binary", lastNodeToUpgrade) - nodesToUpgrade := c.Range(1, lastNodeToUpgrade) - c.Start(ctx, t.L(), startOpts, settings, roachNodes) - upgradeNodes(ctx, t, c, nodesToUpgrade, startOpts, clusterupgrade.CurrentVersion()) + mvt := mixedversion.NewTest(ctx, t, t.L(), c, roachNodes, mixedversion.NeverUseFixtures, + // The http requests to the admin UI performed by the test don't play + // well with secure clusters. As of the time of writing, they return + // either of the following errors: + // tls: failed to verify certificate: x509: “node” certificate is not standards compliant + // tls: failed to verify certificate: x509: certificate signed by unknown authority + // + // Disable secure mode for simplicity. + mixedversion.ClusterSettingOption(install.SecureOption(false)), + mixedversion.ClusterSettingOption(install.ClusterSettingsOption(settings.ClusterSettings)), + ) + mvt.InMixedVersion("rebalance load run", + func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + return rebalanceByLoad( + ctx, t, c, rebalanceMode, maxDuration, concurrency, appNode, numStores, numNodes) + }) + mvt.Run() } else { + // Enable collecting CPU profiles when the CPU utilization exceeds 90%. + // This helps debug failures which occur as a result of mismatches + // between allocation (QPS/replica CPU) and hardware signals e.g. see + // #111900. The setting names changed between v22.2 and v23.1, we can't + // easily setup CPU profiling in mixed version tests. + // + // TODO(kvoli): Remove this setup once CPU profiling is enabled by default + // on perf roachtests #97699. + settings.ClusterSettings["server.cpu_profile.duration"] = "2s" + settings.ClusterSettings["server.cpu_profile.interval"] = "2" + settings.ClusterSettings["server.cpu_profile.cpu_usage_combined_threshold"] = "90" c.Start(ctx, t.L(), startOpts, settings, roachNodes) + require.NoError(t, rebalanceByLoad( + ctx, t, c, rebalanceMode, maxDuration, + concurrency, appNode, numStores, numNodes, + )) } - c.Put(ctx, t.DeprecatedWorkload(), "./workload", appNode) - c.Run(ctx, appNode, fmt.Sprintf("./workload init kv --drop --splits=%d {pgurl:1}", splits)) - - db := c.Conn(ctx, t.L(), 1) - defer db.Close() - - require.NoError(t, WaitFor3XReplication(ctx, t, db)) - t.Status("disable load based splitting") - require.NoError(t, disableLoadBasedSplitting(ctx, db)) - t.Status(fmt.Sprintf("setting rebalance mode to %s", rebalanceMode)) - _, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode) - require.NoError(t, err) - // Enable collecting CPU profiles when the CPU utilization exceeds 90%. - // This helps debug failures which occur as a result of mismatches - // between allocation (QPS/replica CPU) and hardware signals e.g. see - // #111900. - // - // TODO(kvoli): Remove this setup once CPU profiling is enabled by default - // on perf roachtests #97699. - _, err = db.ExecContext(ctx, `SET CLUSTER SETTING server.cpu_profile.duration = '2s'`) - require.NoError(t, err) - _, err = db.ExecContext(ctx, `SET CLUSTER SETTING server.cpu_profile.interval = '2m'`) - require.NoError(t, err) - _, err = db.ExecContext(ctx, `SET CLUSTER SETTING server.cpu_profile.cpu_usage_combined_threshold = 90`) - require.NoError(t, err) - - var m *errgroup.Group // see comment in version.go - m, ctx = errgroup.WithContext(ctx) - - // Enable us to exit out of workload early when we achieve the desired CPU - // balance. This drastically shortens the duration of the test in the - // common case. - ctx, cancel := context.WithCancel(ctx) - - m.Go(func() error { - t.L().Printf("starting load generator\n") - err := c.RunE(ctx, appNode, fmt.Sprintf( - "./workload run kv --read-percent=95 --tolerate-errors --concurrency=%d "+ - "--duration=%v {pgurl:1-%d}", - concurrency, maxDuration, len(roachNodes))) - if errors.Is(ctx.Err(), context.Canceled) { - // We got canceled either because CPU balance was achieved or the - // other worker hit an error. In either case, it's not this worker's - // fault. - return nil - } - return err - }) - - m.Go(func() error { - t.Status("checking for CPU balance") - - storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores) - if err != nil { - return err - } - - var reason string - var balancedStartTime time.Time - var prevIsBalanced bool - for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { - // Wait out the sample period initially to allow the timeseries to - // populate meaningful information for the test to query. - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(statSamplePeriod): - } - - now := timeutil.Now() - clusterStoresCPU, err := storeCPUFn(ctx) - if err != nil { - t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error()) - continue - } - var curIsBalanced bool - curIsBalanced, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance) - t.L().Printf("cpu %s", reason) - if !prevIsBalanced && curIsBalanced { - balancedStartTime = now - } - prevIsBalanced = curIsBalanced - if prevIsBalanced && now.Sub(balancedStartTime) > stableDuration { - t.Status("successfully achieved CPU balance; waiting for kv to finish running") - cancel() - return nil - } - } - - return errors.Errorf("CPU not evenly balanced after timeout: %s", reason) - }) - if err := m.Wait(); err != nil { - t.Fatal(err) - } } - concurrency := 128 - r.Add( registry.TestSpec{ Name: `rebalance/by-load/leases`, @@ -301,6 +223,96 @@ func registerRebalanceLoad(r registry.Registry) { ) } +func rebalanceByLoad( + ctx context.Context, + t test.Test, + c cluster.Cluster, + rebalanceMode string, + maxDuration time.Duration, + concurrency int, + appNode option.NodeListOption, + numStores, numNodes int, +) error { + // We want each store to end up with approximately storeToRangeFactor + // (factor) leases such that the CPU load is evenly spread, e.g. + // (n * factor) -1 splits = factor * n ranges = factor leases per store + // Note that we only assert on the CPU of each store w.r.t the mean, not + // the lease count. + splits := (numStores * storeToRangeFactor) - 1 + c.Run(ctx, appNode, fmt.Sprintf("./cockroach workload init kv --drop --splits=%d {pgurl:1}", splits)) + + db := c.Conn(ctx, t.L(), 1) + defer db.Close() + + require.NoError(t, WaitFor3XReplication(ctx, t, db)) + + var m *errgroup.Group + m, ctx = errgroup.WithContext(ctx) + + // Enable us to exit out of workload early when we achieve the desired CPU + // balance. This drastically shortens the duration of the test in the + // common case. + ctx, cancel := context.WithCancel(ctx) + + m.Go(func() error { + t.L().Printf("starting load generator\n") + err := c.RunE(ctx, appNode, fmt.Sprintf( + "./cockroach workload run kv --read-percent=95 --tolerate-errors --concurrency=%d "+ + "--duration=%v {pgurl:1-%d}", + concurrency, maxDuration, numNodes)) + if errors.Is(ctx.Err(), context.Canceled) { + // We got canceled either because CPU balance was achieved or the + // other worker hit an error. In either case, it's not this worker's + // fault. + return nil + } + return err + }) + + m.Go(func() error { + t.Status("checking for CPU balance") + + storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores) + if err != nil { + return err + } + + var reason string + var balancedStartTime time.Time + var prevIsBalanced bool + for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; { + // Wait out the sample period initially to allow the timeseries to + // populate meaningful information for the test to query. + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(statSamplePeriod): + } + + now := timeutil.Now() + clusterStoresCPU, err := storeCPUFn(ctx) + if err != nil { + t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error()) + continue + } + var curIsBalanced bool + curIsBalanced, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance) + t.L().Printf("cpu %s", reason) + if !prevIsBalanced && curIsBalanced { + balancedStartTime = now + } + prevIsBalanced = curIsBalanced + if prevIsBalanced && now.Sub(balancedStartTime) > stableDuration { + t.Status("successfully achieved CPU balance; waiting for kv to finish running") + cancel() + return nil + } + } + return errors.Errorf("CPU not evenly balanced after timeout: %s", reason) + }) + return m.Wait() +} + // makeStoreCPUFn returns a function which can be called to gather the CPU of // the cluster stores. When there are multiple stores per node, stores on the // same node will report identical CPU.