Skip to content

Commit

Permalink
roachtest: port tpcc/mixed-headroom to the new framework
Browse files Browse the repository at this point in the history
Note that the new framework randomizes how many version
upgrades occur. This removes the need for both a single
upgrade and multiple upgrade test and the two were merged.

Release note: None
Fixes: #110537
  • Loading branch information
DarrylWong committed Nov 6, 2023
1 parent 6cc1224 commit b8eeda0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 219 deletions.
222 changes: 52 additions & 170 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/search"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
Expand Down Expand Up @@ -354,63 +353,11 @@ func maxSupportedTPCCWarehouses(
return warehouses
}

type backgroundFn func(ctx context.Context, u *versionUpgradeTest) error

// A backgroundStepper is a tool to run long-lived commands while a cluster is
// going through a sequence of version upgrade operations.
// It exposes a `launch` step that launches the method carrying out long-running
// work (in the background) and a `stop` step collecting any errors.
type backgroundStepper struct {
// This is the operation that will be launched in the background. When the
// context gets canceled, it should shut down and return without an error.
// The way to typically get this is:
//
// err := doSomething(ctx)
// ctx.Err() != nil {
// return nil
// }
// return err
run backgroundFn
// When not nil, called with the error within `.stop()`. The interceptor
// gets a chance to ignore the error or produce a different one (via t.Fatal).
onStop func(context.Context, test.Test, *versionUpgradeTest, error)
nodes option.NodeListOption // nodes to monitor, defaults to c.All()

// Internal.
m cluster.Monitor
}

// launch spawns the function the background step was initialized with.
func (s *backgroundStepper) launch(ctx context.Context, t test.Test, u *versionUpgradeTest) {
nodes := s.nodes
if nodes == nil {
nodes = u.c.All()
}
s.m = u.c.NewMonitor(ctx, nodes)
s.m.Go(func(ctx context.Context) error {
return s.run(ctx, u)
})
}

func (s *backgroundStepper) wait(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// We don't care about the workload failing since we only use it to produce a
// few `RESTORE` jobs. And indeed workload will fail because it does not
// tolerate pausing of its jobs.
err := s.m.WaitE()
if s.onStop != nil {
s.onStop(ctx, t, u, err)
} else if err != nil {
t.Fatal(err)
}
}

// runTPCCMixedHeadroom runs a mixed-version test that imports a large
// `bank` dataset, and runs one or multiple database upgrades while a
// TPCC workload is running. The number of database upgrades is
// controlled by the `versionsToUpgrade` parameter.
func runTPCCMixedHeadroom(
ctx context.Context, t test.Test, c cluster.Cluster, versionsToUpgrade int,
) {
func runTPCCMixedHeadroom(ctx context.Context, t test.Test, c cluster.Cluster) {
crdbNodes := c.Range(1, c.Spec().NodeCount-1)
workloadNode := c.Node(c.Spec().NodeCount)

Expand All @@ -420,26 +367,6 @@ func runTPCCMixedHeadroom(
headroomWarehouses = 10
}

// We'll need this below.
tpccBackgroundStepper := func(duration time.Duration) backgroundStepper {
return backgroundStepper{
nodes: crdbNodes,
run: func(ctx context.Context, u *versionUpgradeTest) error {
t.L().Printf("running background TPCC workload for %s", duration)
runTPCC(ctx, t, c, tpccOptions{
Warehouses: headroomWarehouses,
Duration: duration,
SetupType: usingExistingData,
Start: func(ctx context.Context, t test.Test, c cluster.Cluster) {
// Noop - we don't let tpcc upload or start binaries in this test.
},
})
return nil
}}
}

randomCRDBNode := func() int { return crdbNodes.RandNode()[0] }

// NB: this results in ~100GB of (actual) disk usage per node once things
// have settled down, and ~7.5k ranges. The import takes ~40 minutes.
// The full 6.5m import ran into out of disk errors (on 250gb machines),
Expand All @@ -449,91 +376,63 @@ func runTPCCMixedHeadroom(
bankRows = 1000
}

rng, seed := randutil.NewLockedPseudoRand()
t.L().Printf("using random seed %d", seed)
history, err := release.RandomPredecessorHistory(rng, t.BuildVersion(), versionsToUpgrade)
if err != nil {
t.Fatal(err)
}
sep := " -> "
t.L().Printf("testing upgrade: %s%scurrent", strings.Join(history, sep), sep)
releases := make([]*clusterupgrade.Version, 0, len(history))
for _, v := range history {
releases = append(releases, clusterupgrade.MustParseVersion(v))
}
releases = append(releases, clusterupgrade.CurrentVersion())
mvt := mixedversion.NewTest(ctx, t, t.L(), c, crdbNodes)

waitForWorkloadToRampUp := sleepStep(rampDuration(c.IsLocal()))
logStep := func(format string, args ...interface{}) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.L().Printf(format, args...)
}
importTPCC := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := tpccImportCmdWithCockroachBinary(test.DefaultCockroachPath, headroomWarehouses, fmt.Sprintf("{pgurl%s}", randomNode))
return c.RunE(ctx, randomNode, cmd)
}

oldestVersion := releases[0]
setupSteps := []versionStep{
logStep("starting from fixture at version %s", oldestVersion),
uploadAndStartFromCheckpointFixture(crdbNodes, oldestVersion),
waitForUpgradeStep(crdbNodes), // let oldest version settle (gossip etc)
uploadVersionStep(workloadNode, clusterupgrade.CurrentVersion()), // for tpccBackgroundStepper's workload

// Load TPCC dataset, don't run TPCC yet. We do this while in the
// version we are starting with to load some data and hopefully
// create some state that will need work by long-running
// migrations.
importTPCCStep(oldestVersion, headroomWarehouses, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBankStep(oldestVersion, bankRows, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBank := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload fixtures import bank", test.DefaultCockroachPath)).
Arg("{pgurl%s}", randomNode).
Flag("payload-bytes", 10240).
Flag("rows", bankRows).
Flag("seed", 4).
Flag("db", "bigbank").
String()
return c.RunE(ctx, randomNode, cmd)
}

// upgradeToVersionSteps returns the list of steps to be performed
// when upgrading to the given version.
upgradeToVersionSteps := func(crdbVersion *clusterupgrade.Version) []versionStep {
duration := 10 * time.Minute
if crdbVersion.IsCurrent() {
duration = 100 * time.Minute
}
tpccWorkload := tpccBackgroundStepper(duration)

return []versionStep{
logStep("upgrading to version %q", crdbVersion.String()),
preventAutoUpgradeStep(randomCRDBNode()),
// Upload and restart cluster into the new
// binary (stays at previous cluster version).
binaryUpgradeStep(crdbNodes, crdbVersion),
// Now start running TPCC in the background.
tpccWorkload.launch,
// Wait for the workload to ramp up before attemping to
// upgrade the cluster version. If we start the migrations
// immediately after launching the tpcc workload above, they
// could finish "too quickly", before the workload had a
// chance to pick up the pace (starting all the workers, range
// merge/splits, compactions, etc). By waiting here, we
// increase the concurrency exposed to the upgrade migrations,
// and increase the chances of exposing bugs (such as #83079).
waitForWorkloadToRampUp,
// While tpcc is running in the background, bump the cluster
// version manually. We do this over allowing automatic upgrades
// to get a better idea of what errors come back here, if any.
// This will block until the long-running migrations have run.
allowAutoUpgradeStep(randomCRDBNode()),
waitForUpgradeStep(crdbNodes),
// Wait until TPCC background run terminates
// and fail if it reports an error.
tpccWorkload.wait,
}
// We don't run this in the background using the Workload() wrapper. We want
// it to block and wait for the workload to ramp up before attempting to upgrade
// the cluster version. If we start the migrations immediately after launching
// the tpcc workload, they could finish "too quickly", before the workload had
// a chance to pick up the pace (starting all the workers, range merge/splits,
// compactions, etc). By waiting here, we increase the concurrency exposed to
// the upgrade migrations, and increase the chances of exposing bugs (such as #83079).
runTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
cmd := roachtestutil.NewCommand("./cockroach workload run tpcc").
Arg("{pgurl%s}", crdbNodes).
Flag("duration", rampDuration(c.IsLocal())).
Flag("warehouses", headroomWarehouses).
Flag("histograms", t.PerfArtifactsDir()+"/stats.json").
Flag("ramp", rampDuration(c.IsLocal())).
Flag("prometheus-port", 2112).
Flag("pprofport", workloadPProfStartPort).
String()
return c.RunE(ctx, workloadNode, cmd)
}

// Test steps consist of the setup steps + the upgrade steps for
// each upgrade being carried out here.
testSteps := append([]versionStep{}, setupSteps...)
for _, nextVersion := range releases[1:] {
testSteps = append(testSteps, upgradeToVersionSteps(nextVersion)...)
checkTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload check tpcc", test.DefaultCockroachPath)).
Arg("{pgurl:1}").
Flag("warehouses", headroomWarehouses).
String()
return c.RunE(ctx, workloadNode, cmd)
}

newVersionUpgradeTest(c, testSteps...).run(ctx, t)
uploadVersion(ctx, t, c, workloadNode, clusterupgrade.CurrentVersion())
mvt.OnStartup("load TPCC dataset", importTPCC)
mvt.OnStartup("load bank dataset", importLargeBank)
mvt.InMixedVersion("TPCC workload", runTPCCWorkload)
mvt.AfterUpgradeFinalized("check TPCC workload", checkTPCCWorkload)
mvt.Run()
}

func registerTPCC(r registry.Registry) {
Expand Down Expand Up @@ -579,27 +478,10 @@ func registerTPCC(r registry.Registry) {
Cluster: mixedHeadroomSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 1)
runTPCCMixedHeadroom(ctx, t, c)
},
})

// N.B. Multiple upgrades may require a released version < 22.2.x, which wasn't built for ARM64.
mixedHeadroomMultiUpgradesSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs(), spec.Arch(vm.ArchAMD64))

r.Add(registry.TestSpec{
// run the same mixed-headroom test, but going back two versions
Name: "tpcc/mixed-headroom/multiple-upgrades/" + mixedHeadroomMultiUpgradesSpec.String(),
Timeout: 5 * time.Hour,
Owner: registry.OwnerTestEng,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Tags: registry.Tags(`default`),
Cluster: mixedHeadroomMultiUpgradesSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 2)
},
})
r.Add(registry.TestSpec{
Name: "tpcc-nowait/nodes=3/w=1",
Owner: registry.OwnerTestEng,
Expand Down
49 changes: 0 additions & 49 deletions pkg/cmd/roachtest/tests/versionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,55 +433,6 @@ done
}).run(ctx, t)
}

// importTPCCStep runs a TPCC import import on the first crdbNode (monitoring them all for
// crashes during the import). If oldV is nil, this runs the import using the specified
// version (for example "19.2.1", as provided by LatestPredecessor()) using the location
// used by c.Stage(). An empty oldV uses the main cockroach binary.
func importTPCCStep(
oldV *clusterupgrade.Version, headroomWarehouses int, crdbNodes option.NodeListOption,
) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// We need to use the predecessor binary to load into the
// predecessor cluster to avoid random breakage. For example, you
// can't use 21.1 to import into 20.2 due to some flag changes.
//
// TODO(tbg): also import a large dataset (for example 2TB bank)
// that will provide cold data that may need to be migrated.
var cmd string
if oldV.IsCurrent() {
cmd = tpccImportCmd(headroomWarehouses)
} else {
cmd = tpccImportCmdWithCockroachBinary(clusterupgrade.BinaryPathForVersion(t, oldV), headroomWarehouses, "--checks=false")
}
// Use a monitor so that we fail cleanly if the cluster crashes
// during import.
m := u.c.NewMonitor(ctx, crdbNodes)
m.Go(func(ctx context.Context) error {
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), cmd)
})
m.Wait()
}
}

func importLargeBankStep(
oldV *clusterupgrade.Version, rows int, crdbNodes option.NodeListOption,
) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// Use the predecessor binary to load into the predecessor
// cluster to avoid random breakage due to flag changes, etc.
binary := clusterupgrade.BinaryPathForVersion(t, oldV)

// Use a monitor so that we fail cleanly if the cluster crashes
// during import.
m := u.c.NewMonitor(ctx, crdbNodes)
m.Go(func(ctx context.Context) error {
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), binary, "workload", "fixtures", "import", "bank",
"--payload-bytes=10240", "--rows="+fmt.Sprint(rows), "--seed=4", "--db=bigbank")
})
m.Wait()
}
}

func sleepStep(d time.Duration) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
time.Sleep(d)
Expand Down

0 comments on commit b8eeda0

Please sign in to comment.