diff --git a/pkg/cmd/roachtest/tests/tpcc.go b/pkg/cmd/roachtest/tests/tpcc.go index 963135c227ad..57d37e589c81 100644 --- a/pkg/cmd/roachtest/tests/tpcc.go +++ b/pkg/cmd/roachtest/tests/tpcc.go @@ -24,16 +24,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" - "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/testutils/release" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/search" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/version" @@ -351,63 +350,12 @@ func maxSupportedTPCCWarehouses( return warehouses } -type backgroundFn func(ctx context.Context, u *versionUpgradeTest) error - -// A backgroundStepper is a tool to run long-lived commands while a cluster is -// going through a sequence of version upgrade operations. -// It exposes a `launch` step that launches the method carrying out long-running -// work (in the background) and a `stop` step collecting any errors. -type backgroundStepper struct { - // This is the operation that will be launched in the background. When the - // context gets canceled, it should shut down and return without an error. - // The way to typically get this is: - // - // err := doSomething(ctx) - // ctx.Err() != nil { - // return nil - // } - // return err - run backgroundFn - // When not nil, called with the error within `.stop()`. The interceptor - // gets a chance to ignore the error or produce a different one (via t.Fatal). - onStop func(context.Context, test.Test, *versionUpgradeTest, error) - nodes option.NodeListOption // nodes to monitor, defaults to c.All() - - // Internal. - m cluster.Monitor -} - -// launch spawns the function the background step was initialized with. -func (s *backgroundStepper) launch(ctx context.Context, t test.Test, u *versionUpgradeTest) { - nodes := s.nodes - if nodes == nil { - nodes = u.c.All() - } - s.m = u.c.NewMonitor(ctx, nodes) - s.m.Go(func(ctx context.Context) error { - return s.run(ctx, u) - }) -} - -func (s *backgroundStepper) wait(ctx context.Context, t test.Test, u *versionUpgradeTest) { - // We don't care about the workload failing since we only use it to produce a - // few `RESTORE` jobs. And indeed workload will fail because it does not - // tolerate pausing of its jobs. - err := s.m.WaitE() - if s.onStop != nil { - s.onStop(ctx, t, u, err) - } else if err != nil { - t.Fatal(err) - } -} - // runTPCCMixedHeadroom runs a mixed-version test that imports a large -// `bank` dataset, and runs one or multiple database upgrades while a -// TPCC workload is running. The number of database upgrades is -// controlled by the `versionsToUpgrade` parameter. -func runTPCCMixedHeadroom( - ctx context.Context, t test.Test, c cluster.Cluster, versionsToUpgrade int, -) { +// `bank` dataset, and runs multiple database upgrades while a TPCC +// workload is running. The number of database upgrades is randomized +// by the mixed-version framework which chooses a random predecessor version +// and upgrades until it reaches the current version. +func runTPCCMixedHeadroom(ctx context.Context, t test.Test, c cluster.Cluster) { crdbNodes := c.Range(1, c.Spec().NodeCount-1) workloadNode := c.Node(c.Spec().NodeCount) @@ -417,26 +365,6 @@ func runTPCCMixedHeadroom( headroomWarehouses = 10 } - // We'll need this below. - tpccBackgroundStepper := func(duration time.Duration) backgroundStepper { - return backgroundStepper{ - nodes: crdbNodes, - run: func(ctx context.Context, u *versionUpgradeTest) error { - t.L().Printf("running background TPCC workload for %s", duration) - runTPCC(ctx, t, c, tpccOptions{ - Warehouses: headroomWarehouses, - Duration: duration, - SetupType: usingExistingData, - Start: func(ctx context.Context, t test.Test, c cluster.Cluster) { - // Noop - we don't let tpcc upload or start binaries in this test. - }, - }) - return nil - }} - } - - randomCRDBNode := func() int { return crdbNodes.RandNode()[0] } - // NB: this results in ~100GB of (actual) disk usage per node once things // have settled down, and ~7.5k ranges. The import takes ~40 minutes. // The full 6.5m import ran into out of disk errors (on 250gb machines), @@ -446,91 +374,74 @@ func runTPCCMixedHeadroom( bankRows = 1000 } - rng, seed := randutil.NewLockedPseudoRand() - t.L().Printf("using random seed %d", seed) - history, err := release.RandomPredecessorHistory(rng, t.BuildVersion(), versionsToUpgrade) - if err != nil { - t.Fatal(err) - } - sep := " -> " - t.L().Printf("testing upgrade: %s%scurrent", strings.Join(history, sep), sep) - releases := make([]*clusterupgrade.Version, 0, len(history)) - for _, v := range history { - releases = append(releases, clusterupgrade.MustParseVersion(v)) - } - releases = append(releases, clusterupgrade.CurrentVersion()) + mvt := mixedversion.NewTest(ctx, t, t.L(), c, crdbNodes) - waitForWorkloadToRampUp := sleepStep(rampDuration(c.IsLocal())) - logStep := func(format string, args ...interface{}) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.L().Printf(format, args...) - } + importTPCC := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + randomNode := c.Node(h.RandomNode(rng, crdbNodes)) + cmd := tpccImportCmdWithCockroachBinary(test.DefaultCockroachPath, headroomWarehouses, fmt.Sprintf("{pgurl%s}", randomNode)) + return c.RunE(ctx, randomNode, cmd) } - oldestVersion := releases[0] - setupSteps := []versionStep{ - logStep("starting from fixture at version %s", oldestVersion), - uploadAndStartFromCheckpointFixture(crdbNodes, oldestVersion), - waitForUpgradeStep(crdbNodes), // let oldest version settle (gossip etc) - uploadVersionStep(workloadNode, clusterupgrade.CurrentVersion()), // for tpccBackgroundStepper's workload - - // Load TPCC dataset, don't run TPCC yet. We do this while in the - // version we are starting with to load some data and hopefully - // create some state that will need work by long-running - // migrations. - importTPCCStep(oldestVersion, headroomWarehouses, crdbNodes), - // Add a lot of cold data to this cluster. This further stresses the version - // upgrade machinery, in which a) all ranges are touched and b) work proportional - // to the amount data may be carried out. - importLargeBankStep(oldestVersion, bankRows, crdbNodes), + // Add a lot of cold data to this cluster. This further stresses the version + // upgrade machinery, in which a) all ranges are touched and b) work proportional + // to the amount data may be carried out. + importLargeBank := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + randomNode := c.Node(h.RandomNode(rng, crdbNodes)) + cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload fixtures import bank", test.DefaultCockroachPath)). + Arg("{pgurl%s}", randomNode). + Flag("payload-bytes", 10240). + Flag("rows", bankRows). + Flag("seed", 4). + Flag("db", "bigbank"). + String() + return c.RunE(ctx, randomNode, cmd) } - // upgradeToVersionSteps returns the list of steps to be performed - // when upgrading to the given version. - upgradeToVersionSteps := func(crdbVersion *clusterupgrade.Version) []versionStep { - duration := 10 * time.Minute - if crdbVersion.IsCurrent() { - duration = 100 * time.Minute - } - tpccWorkload := tpccBackgroundStepper(duration) - - return []versionStep{ - logStep("upgrading to version %q", crdbVersion.String()), - preventAutoUpgradeStep(randomCRDBNode()), - // Upload and restart cluster into the new - // binary (stays at previous cluster version). - binaryUpgradeStep(crdbNodes, crdbVersion), - // Now start running TPCC in the background. - tpccWorkload.launch, - // Wait for the workload to ramp up before attemping to - // upgrade the cluster version. If we start the migrations - // immediately after launching the tpcc workload above, they - // could finish "too quickly", before the workload had a - // chance to pick up the pace (starting all the workers, range - // merge/splits, compactions, etc). By waiting here, we - // increase the concurrency exposed to the upgrade migrations, - // and increase the chances of exposing bugs (such as #83079). - waitForWorkloadToRampUp, - // While tpcc is running in the background, bump the cluster - // version manually. We do this over allowing automatic upgrades - // to get a better idea of what errors come back here, if any. - // This will block until the long-running migrations have run. - allowAutoUpgradeStep(randomCRDBNode()), - waitForUpgradeStep(crdbNodes), - // Wait until TPCC background run terminates - // and fail if it reports an error. - tpccWorkload.wait, + // We don't run this in the background using the Workload() wrapper. We want + // it to block and wait for the workload to ramp up before attempting to upgrade + // the cluster version. If we start the migrations immediately after launching + // the tpcc workload, they could finish "too quickly", before the workload had + // a chance to pick up the pace (starting all the workers, range merge/splits, + // compactions, etc). By waiting here, we increase the concurrency exposed to + // the upgrade migrations, and increase the chances of exposing bugs (such as #83079). + runTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + workloadDur := 10 * time.Minute + rampDur := rampDuration(c.IsLocal()) + // If migrations are running we want to ramp up the workload faster in order + // to expose them to more concurrent load. In a similar goal, we also let the + // TPCC workload run longer. + if h.Context().Finalizing && !c.IsLocal() { + rampDur = 1 * time.Minute + if h.Context().ToVersion.IsCurrent() { + workloadDur = 100 * time.Minute + } } + cmd := roachtestutil.NewCommand("./cockroach workload run tpcc"). + Arg("{pgurl%s}", crdbNodes). + Flag("duration", workloadDur). + Flag("warehouses", headroomWarehouses). + Flag("histograms", t.PerfArtifactsDir()+"/stats.json"). + Flag("ramp", rampDur). + Flag("prometheus-port", 2112). + Flag("pprofport", workloadPProfStartPort). + String() + return c.RunE(ctx, workloadNode, cmd) } - // Test steps consist of the setup steps + the upgrade steps for - // each upgrade being carried out here. - testSteps := append([]versionStep{}, setupSteps...) - for _, nextVersion := range releases[1:] { - testSteps = append(testSteps, upgradeToVersionSteps(nextVersion)...) + checkTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload check tpcc", test.DefaultCockroachPath)). + Arg("{pgurl:1}"). + Flag("warehouses", headroomWarehouses). + String() + return c.RunE(ctx, workloadNode, cmd) } - newVersionUpgradeTest(c, testSteps...).run(ctx, t) + uploadVersion(ctx, t, c, workloadNode, clusterupgrade.CurrentVersion()) + mvt.OnStartup("load TPCC dataset", importTPCC) + mvt.OnStartup("load bank dataset", importLargeBank) + mvt.InMixedVersion("TPCC workload", runTPCCWorkload) + mvt.AfterUpgradeFinalized("check TPCC workload", checkTPCCWorkload) + mvt.Run() } func registerTPCC(r registry.Registry) { @@ -566,7 +477,7 @@ func registerTPCC(r registry.Registry) { // migrations while TPCC runs. It simulates a real production // deployment in the middle of the migration into a new cluster version. Name: "tpcc/mixed-headroom/" + mixedHeadroomSpec.String(), - Timeout: 5 * time.Hour, + Timeout: 6 * time.Hour, Owner: registry.OwnerTestEng, // TODO(tbg): add release_qualification tag once we know the test isn't // buggy. @@ -576,27 +487,10 @@ func registerTPCC(r registry.Registry) { Cluster: mixedHeadroomSpec, EncryptionSupport: registry.EncryptionMetamorphic, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runTPCCMixedHeadroom(ctx, t, c, 1) + runTPCCMixedHeadroom(ctx, t, c) }, }) - // N.B. Multiple upgrades may require a released version < 22.2.x, which wasn't built for ARM64. - mixedHeadroomMultiUpgradesSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs(), spec.Arch(vm.ArchAMD64)) - - r.Add(registry.TestSpec{ - // run the same mixed-headroom test, but going back two versions - Name: "tpcc/mixed-headroom/multiple-upgrades/" + mixedHeadroomMultiUpgradesSpec.String(), - Timeout: 5 * time.Hour, - Owner: registry.OwnerTestEng, - CompatibleClouds: registry.AllExceptAWS, - Suites: registry.Suites(registry.Nightly), - Tags: registry.Tags(`default`), - Cluster: mixedHeadroomMultiUpgradesSpec, - EncryptionSupport: registry.EncryptionMetamorphic, - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runTPCCMixedHeadroom(ctx, t, c, 2) - }, - }) r.Add(registry.TestSpec{ Name: "tpcc-nowait/nodes=3/w=1", Owner: registry.OwnerTestEng, diff --git a/pkg/cmd/roachtest/tests/versionupgrade.go b/pkg/cmd/roachtest/tests/versionupgrade.go index 9efb38238cdd..4ab7da61d676 100644 --- a/pkg/cmd/roachtest/tests/versionupgrade.go +++ b/pkg/cmd/roachtest/tests/versionupgrade.go @@ -417,55 +417,6 @@ done }).run(ctx, t) } -// importTPCCStep runs a TPCC import import on the first crdbNode (monitoring them all for -// crashes during the import). If oldV is nil, this runs the import using the specified -// version (for example "19.2.1", as provided by LatestPredecessor()) using the location -// used by c.Stage(). An empty oldV uses the main cockroach binary. -func importTPCCStep( - oldV *clusterupgrade.Version, headroomWarehouses int, crdbNodes option.NodeListOption, -) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - // We need to use the predecessor binary to load into the - // predecessor cluster to avoid random breakage. For example, you - // can't use 21.1 to import into 20.2 due to some flag changes. - // - // TODO(tbg): also import a large dataset (for example 2TB bank) - // that will provide cold data that may need to be migrated. - var cmd string - if oldV.IsCurrent() { - cmd = tpccImportCmd(headroomWarehouses) - } else { - cmd = tpccImportCmdWithCockroachBinary(clusterupgrade.BinaryPathForVersion(t, oldV), headroomWarehouses, "--checks=false") - } - // Use a monitor so that we fail cleanly if the cluster crashes - // during import. - m := u.c.NewMonitor(ctx, crdbNodes) - m.Go(func(ctx context.Context) error { - return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), cmd) - }) - m.Wait() - } -} - -func importLargeBankStep( - oldV *clusterupgrade.Version, rows int, crdbNodes option.NodeListOption, -) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - // Use the predecessor binary to load into the predecessor - // cluster to avoid random breakage due to flag changes, etc. - binary := clusterupgrade.BinaryPathForVersion(t, oldV) - - // Use a monitor so that we fail cleanly if the cluster crashes - // during import. - m := u.c.NewMonitor(ctx, crdbNodes) - m.Go(func(ctx context.Context) error { - return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), binary, "workload", "fixtures", "import", "bank", - "--payload-bytes=10240", "--rows="+fmt.Sprint(rows), "--seed=4", "--db=bigbank") - }) - m.Wait() - } -} - func sleepStep(d time.Duration) versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { time.Sleep(d)