Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
roachtest: port tpcc/mixed-headroom to the new framework
Browse files Browse the repository at this point in the history
Note that the new framework randomizes how many version
upgrades occur. This removes the need for both a single
upgrade and multiple upgrade test and the two were merged.

Release note: None
Fixes: #110537
DarrylWong committed Nov 7, 2023
1 parent 6cc1224 commit f40aaf9
Showing 2 changed files with 64 additions and 221 deletions.
236 changes: 64 additions & 172 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
@@ -24,16 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/search"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
@@ -354,63 +353,12 @@ func maxSupportedTPCCWarehouses(
return warehouses
}

type backgroundFn func(ctx context.Context, u *versionUpgradeTest) error

// A backgroundStepper is a tool to run long-lived commands while a cluster is
// going through a sequence of version upgrade operations.
// It exposes a `launch` step that launches the method carrying out long-running
// work (in the background) and a `stop` step collecting any errors.
type backgroundStepper struct {
// This is the operation that will be launched in the background. When the
// context gets canceled, it should shut down and return without an error.
// The way to typically get this is:
//
// err := doSomething(ctx)
// ctx.Err() != nil {
// return nil
// }
// return err
run backgroundFn
// When not nil, called with the error within `.stop()`. The interceptor
// gets a chance to ignore the error or produce a different one (via t.Fatal).
onStop func(context.Context, test.Test, *versionUpgradeTest, error)
nodes option.NodeListOption // nodes to monitor, defaults to c.All()

// Internal.
m cluster.Monitor
}

// launch spawns the function the background step was initialized with.
func (s *backgroundStepper) launch(ctx context.Context, t test.Test, u *versionUpgradeTest) {
nodes := s.nodes
if nodes == nil {
nodes = u.c.All()
}
s.m = u.c.NewMonitor(ctx, nodes)
s.m.Go(func(ctx context.Context) error {
return s.run(ctx, u)
})
}

func (s *backgroundStepper) wait(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// We don't care about the workload failing since we only use it to produce a
// few `RESTORE` jobs. And indeed workload will fail because it does not
// tolerate pausing of its jobs.
err := s.m.WaitE()
if s.onStop != nil {
s.onStop(ctx, t, u, err)
} else if err != nil {
t.Fatal(err)
}
}

// runTPCCMixedHeadroom runs a mixed-version test that imports a large
// `bank` dataset, and runs one or multiple database upgrades while a
// TPCC workload is running. The number of database upgrades is
// controlled by the `versionsToUpgrade` parameter.
func runTPCCMixedHeadroom(
ctx context.Context, t test.Test, c cluster.Cluster, versionsToUpgrade int,
) {
// `bank` dataset, and runs multiple database upgrades while a TPCC
// workload is running. The number of database upgrades is randomized
// by the mixed-version framework which chooses a random predecessor version
// and upgrades until it reaches the current version.
func runTPCCMixedHeadroom(ctx context.Context, t test.Test, c cluster.Cluster) {
crdbNodes := c.Range(1, c.Spec().NodeCount-1)
workloadNode := c.Node(c.Spec().NodeCount)

@@ -420,26 +368,6 @@ func runTPCCMixedHeadroom(
headroomWarehouses = 10
}

// We'll need this below.
tpccBackgroundStepper := func(duration time.Duration) backgroundStepper {
return backgroundStepper{
nodes: crdbNodes,
run: func(ctx context.Context, u *versionUpgradeTest) error {
t.L().Printf("running background TPCC workload for %s", duration)
runTPCC(ctx, t, c, tpccOptions{
Warehouses: headroomWarehouses,
Duration: duration,
SetupType: usingExistingData,
Start: func(ctx context.Context, t test.Test, c cluster.Cluster) {
// Noop - we don't let tpcc upload or start binaries in this test.
},
})
return nil
}}
}

randomCRDBNode := func() int { return crdbNodes.RandNode()[0] }

// NB: this results in ~100GB of (actual) disk usage per node once things
// have settled down, and ~7.5k ranges. The import takes ~40 minutes.
// The full 6.5m import ran into out of disk errors (on 250gb machines),
@@ -449,91 +377,72 @@ func runTPCCMixedHeadroom(
bankRows = 1000
}

rng, seed := randutil.NewLockedPseudoRand()
t.L().Printf("using random seed %d", seed)
history, err := release.RandomPredecessorHistory(rng, t.BuildVersion(), versionsToUpgrade)
if err != nil {
t.Fatal(err)
}
sep := " -> "
t.L().Printf("testing upgrade: %s%scurrent", strings.Join(history, sep), sep)
releases := make([]*clusterupgrade.Version, 0, len(history))
for _, v := range history {
releases = append(releases, clusterupgrade.MustParseVersion(v))
}
releases = append(releases, clusterupgrade.CurrentVersion())
mvt := mixedversion.NewTest(ctx, t, t.L(), c, crdbNodes)

waitForWorkloadToRampUp := sleepStep(rampDuration(c.IsLocal()))
logStep := func(format string, args ...interface{}) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.L().Printf(format, args...)
}
importTPCC := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := tpccImportCmdWithCockroachBinary(test.DefaultCockroachPath, headroomWarehouses, fmt.Sprintf("{pgurl%s}", randomNode))
return c.RunE(ctx, randomNode, cmd)
}

oldestVersion := releases[0]
setupSteps := []versionStep{
logStep("starting from fixture at version %s", oldestVersion),
uploadAndStartFromCheckpointFixture(crdbNodes, oldestVersion),
waitForUpgradeStep(crdbNodes), // let oldest version settle (gossip etc)
uploadVersionStep(workloadNode, clusterupgrade.CurrentVersion()), // for tpccBackgroundStepper's workload

// Load TPCC dataset, don't run TPCC yet. We do this while in the
// version we are starting with to load some data and hopefully
// create some state that will need work by long-running
// migrations.
importTPCCStep(oldestVersion, headroomWarehouses, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBankStep(oldestVersion, bankRows, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBank := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload fixtures import bank", test.DefaultCockroachPath)).
Arg("{pgurl%s}", randomNode).
Flag("payload-bytes", 10240).
Flag("rows", bankRows).
Flag("seed", 4).
Flag("db", "bigbank").
String()
return c.RunE(ctx, randomNode, cmd)
}

// upgradeToVersionSteps returns the list of steps to be performed
// when upgrading to the given version.
upgradeToVersionSteps := func(crdbVersion *clusterupgrade.Version) []versionStep {
duration := 10 * time.Minute
if crdbVersion.IsCurrent() {
duration = 100 * time.Minute
}
tpccWorkload := tpccBackgroundStepper(duration)

return []versionStep{
logStep("upgrading to version %q", crdbVersion.String()),
preventAutoUpgradeStep(randomCRDBNode()),
// Upload and restart cluster into the new
// binary (stays at previous cluster version).
binaryUpgradeStep(crdbNodes, crdbVersion),
// Now start running TPCC in the background.
tpccWorkload.launch,
// Wait for the workload to ramp up before attemping to
// upgrade the cluster version. If we start the migrations
// immediately after launching the tpcc workload above, they
// could finish "too quickly", before the workload had a
// chance to pick up the pace (starting all the workers, range
// merge/splits, compactions, etc). By waiting here, we
// increase the concurrency exposed to the upgrade migrations,
// and increase the chances of exposing bugs (such as #83079).
waitForWorkloadToRampUp,
// While tpcc is running in the background, bump the cluster
// version manually. We do this over allowing automatic upgrades
// to get a better idea of what errors come back here, if any.
// This will block until the long-running migrations have run.
allowAutoUpgradeStep(randomCRDBNode()),
waitForUpgradeStep(crdbNodes),
// Wait until TPCC background run terminates
// and fail if it reports an error.
tpccWorkload.wait,
// We don't run this in the background using the Workload() wrapper. We want
// it to block and wait for the workload to ramp up before attempting to upgrade
// the cluster version. If we start the migrations immediately after launching
// the tpcc workload, they could finish "too quickly", before the workload had
// a chance to pick up the pace (starting all the workers, range merge/splits,
// compactions, etc). By waiting here, we increase the concurrency exposed to
// the upgrade migrations, and increase the chances of exposing bugs (such as #83079).
runTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
workloadDur := 10 * time.Minute
rampDur := rampDuration(c.IsLocal())
// If migrations are running we want to ramp up the workload faster in order
// to expose them to more concurrent load. In a similar goal, we also let the
// TPCC workload run longer.
if h.Context().Finalizing && !c.IsLocal() {
rampDur = 1 * time.Minute
workloadDur = 100 * time.Minute
}
cmd := roachtestutil.NewCommand("./cockroach workload run tpcc").
Arg("{pgurl%s}", crdbNodes).
Flag("duration", workloadDur).
Flag("warehouses", headroomWarehouses).
Flag("histograms", t.PerfArtifactsDir()+"/stats.json").
Flag("ramp", rampDur).
Flag("prometheus-port", 2112).
Flag("pprofport", workloadPProfStartPort).
String()
return c.RunE(ctx, workloadNode, cmd)
}

// Test steps consist of the setup steps + the upgrade steps for
// each upgrade being carried out here.
testSteps := append([]versionStep{}, setupSteps...)
for _, nextVersion := range releases[1:] {
testSteps = append(testSteps, upgradeToVersionSteps(nextVersion)...)
checkTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload check tpcc", test.DefaultCockroachPath)).
Arg("{pgurl:1}").
Flag("warehouses", headroomWarehouses).
String()
return c.RunE(ctx, workloadNode, cmd)
}

newVersionUpgradeTest(c, testSteps...).run(ctx, t)
uploadVersion(ctx, t, c, workloadNode, clusterupgrade.CurrentVersion())
mvt.OnStartup("load TPCC dataset", importTPCC)
mvt.OnStartup("load bank dataset", importLargeBank)
mvt.InMixedVersion("TPCC workload", runTPCCWorkload)
mvt.AfterUpgradeFinalized("check TPCC workload", checkTPCCWorkload)
mvt.Run()
}

func registerTPCC(r registry.Registry) {
@@ -579,27 +488,10 @@ func registerTPCC(r registry.Registry) {
Cluster: mixedHeadroomSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 1)
runTPCCMixedHeadroom(ctx, t, c)
},
})

// N.B. Multiple upgrades may require a released version < 22.2.x, which wasn't built for ARM64.
mixedHeadroomMultiUpgradesSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs(), spec.Arch(vm.ArchAMD64))

r.Add(registry.TestSpec{
// run the same mixed-headroom test, but going back two versions
Name: "tpcc/mixed-headroom/multiple-upgrades/" + mixedHeadroomMultiUpgradesSpec.String(),
Timeout: 5 * time.Hour,
Owner: registry.OwnerTestEng,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Tags: registry.Tags(`default`),
Cluster: mixedHeadroomMultiUpgradesSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 2)
},
})
r.Add(registry.TestSpec{
Name: "tpcc-nowait/nodes=3/w=1",
Owner: registry.OwnerTestEng,
49 changes: 0 additions & 49 deletions pkg/cmd/roachtest/tests/versionupgrade.go
Original file line number Diff line number Diff line change
@@ -433,55 +433,6 @@ done
}).run(ctx, t)
}

// importTPCCStep runs a TPCC import import on the first crdbNode (monitoring them all for
// crashes during the import). If oldV is nil, this runs the import using the specified
// version (for example "19.2.1", as provided by LatestPredecessor()) using the location
// used by c.Stage(). An empty oldV uses the main cockroach binary.
func importTPCCStep(
oldV *clusterupgrade.Version, headroomWarehouses int, crdbNodes option.NodeListOption,
) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// We need to use the predecessor binary to load into the
// predecessor cluster to avoid random breakage. For example, you
// can't use 21.1 to import into 20.2 due to some flag changes.
//
// TODO(tbg): also import a large dataset (for example 2TB bank)
// that will provide cold data that may need to be migrated.
var cmd string
if oldV.IsCurrent() {
cmd = tpccImportCmd(headroomWarehouses)
} else {
cmd = tpccImportCmdWithCockroachBinary(clusterupgrade.BinaryPathForVersion(t, oldV), headroomWarehouses, "--checks=false")
}
// Use a monitor so that we fail cleanly if the cluster crashes
// during import.
m := u.c.NewMonitor(ctx, crdbNodes)
m.Go(func(ctx context.Context) error {
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), cmd)
})
m.Wait()
}
}

func importLargeBankStep(
oldV *clusterupgrade.Version, rows int, crdbNodes option.NodeListOption,
) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// Use the predecessor binary to load into the predecessor
// cluster to avoid random breakage due to flag changes, etc.
binary := clusterupgrade.BinaryPathForVersion(t, oldV)

// Use a monitor so that we fail cleanly if the cluster crashes
// during import.
m := u.c.NewMonitor(ctx, crdbNodes)
m.Go(func(ctx context.Context) error {
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), binary, "workload", "fixtures", "import", "bank",
"--payload-bytes=10240", "--rows="+fmt.Sprint(rows), "--seed=4", "--db=bigbank")
})
m.Wait()
}
}

func sleepStep(d time.Duration) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
time.Sleep(d)

0 comments on commit f40aaf9

Please sign in to comment.