Skip to content

Commit

Permalink
Merge pull request #114249 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-113706

release-23.2: roachtest: port tpcc/mixed-headroom to the new framework
  • Loading branch information
DarrylWong authored Nov 13, 2023
2 parents 7ed0b2d + 7ea455e commit 1b12b6c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 222 deletions.
240 changes: 67 additions & 173 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/search"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
Expand Down Expand Up @@ -351,63 +350,12 @@ func maxSupportedTPCCWarehouses(
return warehouses
}

type backgroundFn func(ctx context.Context, u *versionUpgradeTest) error

// A backgroundStepper is a tool to run long-lived commands while a cluster is
// going through a sequence of version upgrade operations.
// It exposes a `launch` step that launches the method carrying out long-running
// work (in the background) and a `stop` step collecting any errors.
type backgroundStepper struct {
// This is the operation that will be launched in the background. When the
// context gets canceled, it should shut down and return without an error.
// The way to typically get this is:
//
// err := doSomething(ctx)
// ctx.Err() != nil {
// return nil
// }
// return err
run backgroundFn
// When not nil, called with the error within `.stop()`. The interceptor
// gets a chance to ignore the error or produce a different one (via t.Fatal).
onStop func(context.Context, test.Test, *versionUpgradeTest, error)
nodes option.NodeListOption // nodes to monitor, defaults to c.All()

// Internal.
m cluster.Monitor
}

// launch spawns the function the background step was initialized with.
func (s *backgroundStepper) launch(ctx context.Context, t test.Test, u *versionUpgradeTest) {
nodes := s.nodes
if nodes == nil {
nodes = u.c.All()
}
s.m = u.c.NewMonitor(ctx, nodes)
s.m.Go(func(ctx context.Context) error {
return s.run(ctx, u)
})
}

func (s *backgroundStepper) wait(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// We don't care about the workload failing since we only use it to produce a
// few `RESTORE` jobs. And indeed workload will fail because it does not
// tolerate pausing of its jobs.
err := s.m.WaitE()
if s.onStop != nil {
s.onStop(ctx, t, u, err)
} else if err != nil {
t.Fatal(err)
}
}

// runTPCCMixedHeadroom runs a mixed-version test that imports a large
// `bank` dataset, and runs one or multiple database upgrades while a
// TPCC workload is running. The number of database upgrades is
// controlled by the `versionsToUpgrade` parameter.
func runTPCCMixedHeadroom(
ctx context.Context, t test.Test, c cluster.Cluster, versionsToUpgrade int,
) {
// `bank` dataset, and runs multiple database upgrades while a TPCC
// workload is running. The number of database upgrades is randomized
// by the mixed-version framework which chooses a random predecessor version
// and upgrades until it reaches the current version.
func runTPCCMixedHeadroom(ctx context.Context, t test.Test, c cluster.Cluster) {
crdbNodes := c.Range(1, c.Spec().NodeCount-1)
workloadNode := c.Node(c.Spec().NodeCount)

Expand All @@ -417,26 +365,6 @@ func runTPCCMixedHeadroom(
headroomWarehouses = 10
}

// We'll need this below.
tpccBackgroundStepper := func(duration time.Duration) backgroundStepper {
return backgroundStepper{
nodes: crdbNodes,
run: func(ctx context.Context, u *versionUpgradeTest) error {
t.L().Printf("running background TPCC workload for %s", duration)
runTPCC(ctx, t, c, tpccOptions{
Warehouses: headroomWarehouses,
Duration: duration,
SetupType: usingExistingData,
Start: func(ctx context.Context, t test.Test, c cluster.Cluster) {
// Noop - we don't let tpcc upload or start binaries in this test.
},
})
return nil
}}
}

randomCRDBNode := func() int { return crdbNodes.RandNode()[0] }

// NB: this results in ~100GB of (actual) disk usage per node once things
// have settled down, and ~7.5k ranges. The import takes ~40 minutes.
// The full 6.5m import ran into out of disk errors (on 250gb machines),
Expand All @@ -446,91 +374,74 @@ func runTPCCMixedHeadroom(
bankRows = 1000
}

rng, seed := randutil.NewLockedPseudoRand()
t.L().Printf("using random seed %d", seed)
history, err := release.RandomPredecessorHistory(rng, t.BuildVersion(), versionsToUpgrade)
if err != nil {
t.Fatal(err)
}
sep := " -> "
t.L().Printf("testing upgrade: %s%scurrent", strings.Join(history, sep), sep)
releases := make([]*clusterupgrade.Version, 0, len(history))
for _, v := range history {
releases = append(releases, clusterupgrade.MustParseVersion(v))
}
releases = append(releases, clusterupgrade.CurrentVersion())
mvt := mixedversion.NewTest(ctx, t, t.L(), c, crdbNodes)

waitForWorkloadToRampUp := sleepStep(rampDuration(c.IsLocal()))
logStep := func(format string, args ...interface{}) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.L().Printf(format, args...)
}
importTPCC := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := tpccImportCmdWithCockroachBinary(test.DefaultCockroachPath, headroomWarehouses, fmt.Sprintf("{pgurl%s}", randomNode))
return c.RunE(ctx, randomNode, cmd)
}

oldestVersion := releases[0]
setupSteps := []versionStep{
logStep("starting from fixture at version %s", oldestVersion),
uploadAndStartFromCheckpointFixture(crdbNodes, oldestVersion),
waitForUpgradeStep(crdbNodes), // let oldest version settle (gossip etc)
uploadVersionStep(workloadNode, clusterupgrade.CurrentVersion()), // for tpccBackgroundStepper's workload

// Load TPCC dataset, don't run TPCC yet. We do this while in the
// version we are starting with to load some data and hopefully
// create some state that will need work by long-running
// migrations.
importTPCCStep(oldestVersion, headroomWarehouses, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBankStep(oldestVersion, bankRows, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBank := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload fixtures import bank", test.DefaultCockroachPath)).
Arg("{pgurl%s}", randomNode).
Flag("payload-bytes", 10240).
Flag("rows", bankRows).
Flag("seed", 4).
Flag("db", "bigbank").
String()
return c.RunE(ctx, randomNode, cmd)
}

// upgradeToVersionSteps returns the list of steps to be performed
// when upgrading to the given version.
upgradeToVersionSteps := func(crdbVersion *clusterupgrade.Version) []versionStep {
duration := 10 * time.Minute
if crdbVersion.IsCurrent() {
duration = 100 * time.Minute
}
tpccWorkload := tpccBackgroundStepper(duration)

return []versionStep{
logStep("upgrading to version %q", crdbVersion.String()),
preventAutoUpgradeStep(randomCRDBNode()),
// Upload and restart cluster into the new
// binary (stays at previous cluster version).
binaryUpgradeStep(crdbNodes, crdbVersion),
// Now start running TPCC in the background.
tpccWorkload.launch,
// Wait for the workload to ramp up before attemping to
// upgrade the cluster version. If we start the migrations
// immediately after launching the tpcc workload above, they
// could finish "too quickly", before the workload had a
// chance to pick up the pace (starting all the workers, range
// merge/splits, compactions, etc). By waiting here, we
// increase the concurrency exposed to the upgrade migrations,
// and increase the chances of exposing bugs (such as #83079).
waitForWorkloadToRampUp,
// While tpcc is running in the background, bump the cluster
// version manually. We do this over allowing automatic upgrades
// to get a better idea of what errors come back here, if any.
// This will block until the long-running migrations have run.
allowAutoUpgradeStep(randomCRDBNode()),
waitForUpgradeStep(crdbNodes),
// Wait until TPCC background run terminates
// and fail if it reports an error.
tpccWorkload.wait,
// We don't run this in the background using the Workload() wrapper. We want
// it to block and wait for the workload to ramp up before attempting to upgrade
// the cluster version. If we start the migrations immediately after launching
// the tpcc workload, they could finish "too quickly", before the workload had
// a chance to pick up the pace (starting all the workers, range merge/splits,
// compactions, etc). By waiting here, we increase the concurrency exposed to
// the upgrade migrations, and increase the chances of exposing bugs (such as #83079).
runTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
workloadDur := 10 * time.Minute
rampDur := rampDuration(c.IsLocal())
// If migrations are running we want to ramp up the workload faster in order
// to expose them to more concurrent load. In a similar goal, we also let the
// TPCC workload run longer.
if h.Context().Finalizing && !c.IsLocal() {
rampDur = 1 * time.Minute
if h.Context().ToVersion.IsCurrent() {
workloadDur = 100 * time.Minute
}
}
cmd := roachtestutil.NewCommand("./cockroach workload run tpcc").
Arg("{pgurl%s}", crdbNodes).
Flag("duration", workloadDur).
Flag("warehouses", headroomWarehouses).
Flag("histograms", t.PerfArtifactsDir()+"/stats.json").
Flag("ramp", rampDur).
Flag("prometheus-port", 2112).
Flag("pprofport", workloadPProfStartPort).
String()
return c.RunE(ctx, workloadNode, cmd)
}

// Test steps consist of the setup steps + the upgrade steps for
// each upgrade being carried out here.
testSteps := append([]versionStep{}, setupSteps...)
for _, nextVersion := range releases[1:] {
testSteps = append(testSteps, upgradeToVersionSteps(nextVersion)...)
checkTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload check tpcc", test.DefaultCockroachPath)).
Arg("{pgurl:1}").
Flag("warehouses", headroomWarehouses).
String()
return c.RunE(ctx, workloadNode, cmd)
}

newVersionUpgradeTest(c, testSteps...).run(ctx, t)
uploadVersion(ctx, t, c, workloadNode, clusterupgrade.CurrentVersion())
mvt.OnStartup("load TPCC dataset", importTPCC)
mvt.OnStartup("load bank dataset", importLargeBank)
mvt.InMixedVersion("TPCC workload", runTPCCWorkload)
mvt.AfterUpgradeFinalized("check TPCC workload", checkTPCCWorkload)
mvt.Run()
}

func registerTPCC(r registry.Registry) {
Expand Down Expand Up @@ -566,7 +477,7 @@ func registerTPCC(r registry.Registry) {
// migrations while TPCC runs. It simulates a real production
// deployment in the middle of the migration into a new cluster version.
Name: "tpcc/mixed-headroom/" + mixedHeadroomSpec.String(),
Timeout: 5 * time.Hour,
Timeout: 6 * time.Hour,
Owner: registry.OwnerTestEng,
// TODO(tbg): add release_qualification tag once we know the test isn't
// buggy.
Expand All @@ -576,27 +487,10 @@ func registerTPCC(r registry.Registry) {
Cluster: mixedHeadroomSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 1)
runTPCCMixedHeadroom(ctx, t, c)
},
})

// N.B. Multiple upgrades may require a released version < 22.2.x, which wasn't built for ARM64.
mixedHeadroomMultiUpgradesSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs(), spec.Arch(vm.ArchAMD64))

r.Add(registry.TestSpec{
// run the same mixed-headroom test, but going back two versions
Name: "tpcc/mixed-headroom/multiple-upgrades/" + mixedHeadroomMultiUpgradesSpec.String(),
Timeout: 5 * time.Hour,
Owner: registry.OwnerTestEng,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Tags: registry.Tags(`default`),
Cluster: mixedHeadroomMultiUpgradesSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 2)
},
})
r.Add(registry.TestSpec{
Name: "tpcc-nowait/nodes=3/w=1",
Owner: registry.OwnerTestEng,
Expand Down
49 changes: 0 additions & 49 deletions pkg/cmd/roachtest/tests/versionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,55 +417,6 @@ done
}).run(ctx, t)
}

// importTPCCStep runs a TPCC import import on the first crdbNode (monitoring them all for
// crashes during the import). If oldV is nil, this runs the import using the specified
// version (for example "19.2.1", as provided by LatestPredecessor()) using the location
// used by c.Stage(). An empty oldV uses the main cockroach binary.
func importTPCCStep(
oldV *clusterupgrade.Version, headroomWarehouses int, crdbNodes option.NodeListOption,
) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// We need to use the predecessor binary to load into the
// predecessor cluster to avoid random breakage. For example, you
// can't use 21.1 to import into 20.2 due to some flag changes.
//
// TODO(tbg): also import a large dataset (for example 2TB bank)
// that will provide cold data that may need to be migrated.
var cmd string
if oldV.IsCurrent() {
cmd = tpccImportCmd(headroomWarehouses)
} else {
cmd = tpccImportCmdWithCockroachBinary(clusterupgrade.BinaryPathForVersion(t, oldV), headroomWarehouses, "--checks=false")
}
// Use a monitor so that we fail cleanly if the cluster crashes
// during import.
m := u.c.NewMonitor(ctx, crdbNodes)
m.Go(func(ctx context.Context) error {
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), cmd)
})
m.Wait()
}
}

func importLargeBankStep(
oldV *clusterupgrade.Version, rows int, crdbNodes option.NodeListOption,
) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// Use the predecessor binary to load into the predecessor
// cluster to avoid random breakage due to flag changes, etc.
binary := clusterupgrade.BinaryPathForVersion(t, oldV)

// Use a monitor so that we fail cleanly if the cluster crashes
// during import.
m := u.c.NewMonitor(ctx, crdbNodes)
m.Go(func(ctx context.Context) error {
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), binary, "workload", "fixtures", "import", "bank",
"--payload-bytes=10240", "--rows="+fmt.Sprint(rows), "--seed=4", "--db=bigbank")
})
m.Wait()
}
}

func sleepStep(d time.Duration) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
time.Sleep(d)
Expand Down

0 comments on commit 1b12b6c

Please sign in to comment.