Skip to content

Commit

Permalink
roachtest: port rebalance/by-load/*/mixed-version to new framework
Browse files Browse the repository at this point in the history
Port`rebalance/by-load/leases/mixed-version` and
`rebalance/by-load/replicas/mixed-version` to the new mixed-version
framework as described in cockroachdb#110528.

Part of: cockroachdb#110528
Resolves: cockroachdb#115134
Resolves: cockroachdb#114549
Release note: None
  • Loading branch information
kvoli committed Dec 4, 2023
1 parent dc8c484 commit 6a3e45e
Showing 1 changed file with 130 additions and 118 deletions.
248 changes: 130 additions & 118 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -82,134 +82,56 @@ func registerRebalanceLoad(r registry.Registry) {
appNode := c.Node(c.Spec().NodeCount)
numNodes := len(roachNodes)
numStores := numNodes
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
if c.Spec().SSDs > 1 && !c.Spec().RAID0 {
numStores *= c.Spec().SSDs
startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs
}
// We want each store to end up with approximately storeToRangeFactor
// (factor) leases such that the CPU load is evenly spread, e.g.
// (n * factor) -1 splits = factor * n ranges = factor leases per store
// Note that we only assert on the CPU of each store w.r.t the mean, not
// the lease count.
splits := (numStores * storeToRangeFactor) - 1
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")

settings := install.MakeClusterSettings()
settings.ClusterSettings["kv.allocator.load_based_rebalancing"] = rebalanceMode
settings.ClusterSettings["kv.range_split.by_load_enabled"] = "false"

if mixedVersion {
predecessorVersionStr, err := release.LatestPredecessor(t.BuildVersion())
require.NoError(t, err)
predecessorVersion := clusterupgrade.MustParseVersion(predecessorVersionStr)
settings.Binary = uploadCockroach(ctx, t, c, c.All(), predecessorVersion)
// Upgrade some (or all) of the first N-1 CRDB nodes. We ignore the last
// CRDB node (to leave at least one node on the older version), and the
// app node.
lastNodeToUpgrade := rand.Intn(c.Spec().NodeCount-2) + 1
t.L().Printf("upgrading %d nodes to the current cockroach binary", lastNodeToUpgrade)
nodesToUpgrade := c.Range(1, lastNodeToUpgrade)
c.Start(ctx, t.L(), startOpts, settings, roachNodes)
upgradeNodes(ctx, t, c, nodesToUpgrade, startOpts, clusterupgrade.CurrentVersion())
mvt := mixedversion.NewTest(ctx, t, t.L(), c, roachNodes, mixedversion.NeverUseFixtures,
// The http requests to the admin UI performed by the test don't play
// well with secure clusters. As of the time of writing, they return
// either of the following errors:
// tls: failed to verify certificate: x509: “node” certificate is not standards compliant
// tls: failed to verify certificate: x509: certificate signed by unknown authority
//
// Disable secure mode for simplicity.
mixedversion.ClusterSettingOption(install.SecureOption(false)),
mixedversion.ClusterSettingOption(install.ClusterSettingsOption(settings.ClusterSettings)),
)
mvt.InMixedVersion("rebalance load run",
func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error {
return rebalanceByLoad(
ctx, t, c, rebalanceMode, maxDuration, concurrency, appNode, numStores, numNodes)
})
mvt.Run()
} else {
// Enable collecting CPU profiles when the CPU utilization exceeds 90%.
// This helps debug failures which occur as a result of mismatches
// between allocation (QPS/replica CPU) and hardware signals e.g. see
// #111900. The setting names changed between v22.2 and v23.1, we can't
// easily setup CPU profiling in mixed version tests.
//
// TODO(kvoli): Remove this setup once CPU profiling is enabled by default
// on perf roachtests #97699.
settings.ClusterSettings["server.cpu_profile.duration"] = "2s"
settings.ClusterSettings["server.cpu_profile.interval"] = "2"
settings.ClusterSettings["server.cpu_profile.cpu_usage_combined_threshold"] = "90"
c.Start(ctx, t.L(), startOpts, settings, roachNodes)
require.NoError(t, rebalanceByLoad(
ctx, t, c, rebalanceMode, maxDuration,
concurrency, appNode, numStores, numNodes,
))
}

c.Put(ctx, t.DeprecatedWorkload(), "./workload", appNode)
c.Run(ctx, appNode, fmt.Sprintf("./workload init kv --drop --splits=%d {pgurl:1}", splits))

db := c.Conn(ctx, t.L(), 1)
defer db.Close()

require.NoError(t, WaitFor3XReplication(ctx, t, db))
t.Status("disable load based splitting")
require.NoError(t, disableLoadBasedSplitting(ctx, db))
t.Status(fmt.Sprintf("setting rebalance mode to %s", rebalanceMode))
_, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode)
require.NoError(t, err)
// Enable collecting CPU profiles when the CPU utilization exceeds 90%.
// This helps debug failures which occur as a result of mismatches
// between allocation (QPS/replica CPU) and hardware signals e.g. see
// #111900.
//
// TODO(kvoli): Remove this setup once CPU profiling is enabled by default
// on perf roachtests #97699.
_, err = db.ExecContext(ctx, `SET CLUSTER SETTING server.cpu_profile.duration = '2s'`)
require.NoError(t, err)
_, err = db.ExecContext(ctx, `SET CLUSTER SETTING server.cpu_profile.interval = '2m'`)
require.NoError(t, err)
_, err = db.ExecContext(ctx, `SET CLUSTER SETTING server.cpu_profile.cpu_usage_combined_threshold = 90`)
require.NoError(t, err)

var m *errgroup.Group // see comment in version.go
m, ctx = errgroup.WithContext(ctx)

// Enable us to exit out of workload early when we achieve the desired CPU
// balance. This drastically shortens the duration of the test in the
// common case.
ctx, cancel := context.WithCancel(ctx)

m.Go(func() error {
t.L().Printf("starting load generator\n")
err := c.RunE(ctx, appNode, fmt.Sprintf(
"./workload run kv --read-percent=95 --tolerate-errors --concurrency=%d "+
"--duration=%v {pgurl:1-%d}",
concurrency, maxDuration, len(roachNodes)))
if errors.Is(ctx.Err(), context.Canceled) {
// We got canceled either because CPU balance was achieved or the
// other worker hit an error. In either case, it's not this worker's
// fault.
return nil
}
return err
})

m.Go(func() error {
t.Status("checking for CPU balance")

storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores)
if err != nil {
return err
}

var reason string
var balancedStartTime time.Time
var prevIsBalanced bool
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
// Wait out the sample period initially to allow the timeseries to
// populate meaningful information for the test to query.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(statSamplePeriod):
}

now := timeutil.Now()
clusterStoresCPU, err := storeCPUFn(ctx)
if err != nil {
t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error())
continue
}
var curIsBalanced bool
curIsBalanced, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance)
t.L().Printf("cpu %s", reason)
if !prevIsBalanced && curIsBalanced {
balancedStartTime = now
}
prevIsBalanced = curIsBalanced
if prevIsBalanced && now.Sub(balancedStartTime) > stableDuration {
t.Status("successfully achieved CPU balance; waiting for kv to finish running")
cancel()
return nil
}
}

return errors.Errorf("CPU not evenly balanced after timeout: %s", reason)
})
if err := m.Wait(); err != nil {
t.Fatal(err)
}
}

concurrency := 128

r.Add(
registry.TestSpec{
Name: `rebalance/by-load/leases`,
Expand Down Expand Up @@ -301,6 +223,96 @@ func registerRebalanceLoad(r registry.Registry) {
)
}

func rebalanceByLoad(
ctx context.Context,
t test.Test,
c cluster.Cluster,
rebalanceMode string,
maxDuration time.Duration,
concurrency int,
appNode option.NodeListOption,
numStores, numNodes int,
) error {
// We want each store to end up with approximately storeToRangeFactor
// (factor) leases such that the CPU load is evenly spread, e.g.
// (n * factor) -1 splits = factor * n ranges = factor leases per store
// Note that we only assert on the CPU of each store w.r.t the mean, not
// the lease count.
splits := (numStores * storeToRangeFactor) - 1
c.Run(ctx, appNode, fmt.Sprintf("./cockroach workload init kv --drop --splits=%d {pgurl:1}", splits))

db := c.Conn(ctx, t.L(), 1)
defer db.Close()

require.NoError(t, WaitFor3XReplication(ctx, t, db))

var m *errgroup.Group // see comment in version.go
m, ctx = errgroup.WithContext(ctx)

// Enable us to exit out of workload early when we achieve the desired CPU
// balance. This drastically shortens the duration of the test in the
// common case.
ctx, cancel := context.WithCancel(ctx)

m.Go(func() error {
t.L().Printf("starting load generator\n")
err := c.RunE(ctx, appNode, fmt.Sprintf(
"./cockroach workload run kv --read-percent=95 --tolerate-errors --concurrency=%d "+
"--duration=%v {pgurl:1-%d}",
concurrency, maxDuration, numNodes))
if errors.Is(ctx.Err(), context.Canceled) {
// We got canceled either because CPU balance was achieved or the
// other worker hit an error. In either case, it's not this worker's
// fault.
return nil
}
return err
})

m.Go(func() error {
t.Status("checking for CPU balance")

storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores)
if err != nil {
return err
}

var reason string
var balancedStartTime time.Time
var prevIsBalanced bool
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
// Wait out the sample period initially to allow the timeseries to
// populate meaningful information for the test to query.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(statSamplePeriod):
}

now := timeutil.Now()
clusterStoresCPU, err := storeCPUFn(ctx)
if err != nil {
t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error())
continue
}
var curIsBalanced bool
curIsBalanced, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance)
t.L().Printf("cpu %s", reason)
if !prevIsBalanced && curIsBalanced {
balancedStartTime = now
}
prevIsBalanced = curIsBalanced
if prevIsBalanced && now.Sub(balancedStartTime) > stableDuration {
t.Status("successfully achieved CPU balance; waiting for kv to finish running")
cancel()
return nil
}
}
return errors.Errorf("CPU not evenly balanced after timeout: %s", reason)
})
return m.Wait()
}

// makeStoreCPUFn returns a function which can be called to gather the CPU of
// the cluster stores. When there are multiple stores per node, stores on the
// same node will report identical CPU.
Expand Down

0 comments on commit 6a3e45e

Please sign in to comment.