From 9492aecc106eedafb279abe546c1a99d6ed4c060 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 30 Sep 2020 09:49:53 -0400 Subject: [PATCH] roachtest: use crdb's workload for fixtures import tpcc Fixes #55042. Fixes #55041. Fixes #55039. Fixes #55038. Fixes #55037. Fixes #55036. Fixes #55035. Fixes #55033. Fixes #55029. Fixes #55024. Fixes #55022. Fixes #55020. Fixes #55019. Fixes #55018. Fixes #55017. Fixes #55016. Fixes #55013. Fixes #55010. Fixes #55009. Fixes #55008. Fixes #55003. Fixes #55002. Fixes #54998. Fixes #54995. Fixes #54822. Fixes #52693. Fixes #54802. We were already doing this in some places, but needed it in others. --- pkg/cmd/roachtest/cdc.go | 17 ++++--------- pkg/cmd/roachtest/import.go | 7 +---- pkg/cmd/roachtest/overload_tpcc_olap.go | 2 +- pkg/cmd/roachtest/schemachange.go | 4 +-- pkg/cmd/roachtest/scrub.go | 2 +- pkg/cmd/roachtest/tpcc.go | 34 +++++++++++++------------ 6 files changed, 28 insertions(+), 38 deletions(-) diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index ea9bdd996449..697c99f4d94b 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -45,7 +45,6 @@ type cdcTestArgs struct { kafkaChaos bool crdbChaos bool cloudStorageSink bool - fixturesImport bool targetInitialScanLatency time.Duration targetSteadyLatency time.Duration @@ -101,7 +100,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { // value" errors #34025. tpcc.tolerateErrors = true - tpcc.install(ctx, c, args.fixturesImport) + tpcc.install(ctx, c) // TODO(dan,ajwerner): sleeping momentarily before running the workload // mitigates errors like "error in newOrder: missing stock row" from tpcc. time.Sleep(2 * time.Second) @@ -593,7 +592,6 @@ func registerCDC(r *testRegistry) { workloadDuration: "30m", initialScan: true, cloudStorageSink: true, - fixturesImport: true, targetInitialScanLatency: 30 * time.Minute, targetSteadyLatency: time.Minute, }) @@ -733,16 +731,11 @@ type tpccWorkload struct { tolerateErrors bool } -func (tw *tpccWorkload) install(ctx context.Context, c *cluster, fixturesImport bool) { - command := `./workload fixtures load` - if fixturesImport { - // For fixtures import, use the version built into the cockroach binary so - // the tpcc workload-versions match on release branches. - command = `./cockroach workload fixtures import` - } +func (tw *tpccWorkload) install(ctx context.Context, c *cluster) { + // For fixtures import, use the version built into the cockroach binary so + // the tpcc workload-versions match on release branches. c.Run(ctx, tw.workloadNodes, fmt.Sprintf( - `%s tpcc --warehouses=%d --checks=false {pgurl%s}`, - command, + `./cockroach workload fixtures import tpcc --warehouses=%d --checks=false {pgurl%s}`, tw.tpccWarehouseCount, tw.sqlNodes.randNode(), )) diff --git a/pkg/cmd/roachtest/import.go b/pkg/cmd/roachtest/import.go index 910329d4595b..6aca4c009422 100644 --- a/pkg/cmd/roachtest/import.go +++ b/pkg/cmd/roachtest/import.go @@ -17,7 +17,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/version" "github.com/cockroachdb/errors" ) @@ -36,11 +35,7 @@ func registerImportTPCC(r *testRegistry) { hc := NewHealthChecker(c, c.All()) m.Go(hc.Runner) - workloadStr := `./workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'` - if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) { - workloadStr += " --deprecated-fk-indexes" - } - + workloadStr := `./cockroach workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'` m.Go(func(ctx context.Context) error { defer dul.Done() defer hc.Done() diff --git a/pkg/cmd/roachtest/overload_tpcc_olap.go b/pkg/cmd/roachtest/overload_tpcc_olap.go index 64f94acd2780..5dd1074351e1 100644 --- a/pkg/cmd/roachtest/overload_tpcc_olap.go +++ b/pkg/cmd/roachtest/overload_tpcc_olap.go @@ -45,7 +45,7 @@ type tpccOLAPSpec struct { func (s tpccOLAPSpec) run(ctx context.Context, t *test, c *cluster) { crdbNodes, workloadNode := setupTPCC( ctx, t, c, tpccOptions{ - Warehouses: s.Warehouses, SetupType: usingFixture, + Warehouses: s.Warehouses, SetupType: usingImport, }) const queryFileName = "queries.sql" // querybench expects the entire query to be on a single line. diff --git a/pkg/cmd/roachtest/schemachange.go b/pkg/cmd/roachtest/schemachange.go index a3729e17fe20..612173ebef00 100644 --- a/pkg/cmd/roachtest/schemachange.go +++ b/pkg/cmd/roachtest/schemachange.go @@ -312,7 +312,7 @@ func makeIndexAddTpccTest(spec clusterSpec, warehouses int, length time.Duration }) }, Duration: length, - SetupType: usingFixture, + SetupType: usingImport, }) }, MinVersion: "v19.1.0", @@ -457,7 +457,7 @@ func makeSchemaChangeDuringTPCC(spec clusterSpec, warehouses int, length time.Du }) }, Duration: length, - SetupType: usingFixture, + SetupType: usingImport, }) }, MinVersion: "v19.1.0", diff --git a/pkg/cmd/roachtest/scrub.go b/pkg/cmd/roachtest/scrub.go index c7349646ba9e..38e126ff14fb 100644 --- a/pkg/cmd/roachtest/scrub.go +++ b/pkg/cmd/roachtest/scrub.go @@ -82,7 +82,7 @@ func makeScrubTPCCTest( return nil }, Duration: length, - SetupType: usingFixture, + SetupType: usingImport, }) }, MinVersion: "v19.1.0", diff --git a/pkg/cmd/roachtest/tpcc.go b/pkg/cmd/roachtest/tpcc.go index 1e60aa1e89e9..56976bc26752 100644 --- a/pkg/cmd/roachtest/tpcc.go +++ b/pkg/cmd/roachtest/tpcc.go @@ -38,7 +38,7 @@ import ( type tpccSetupType int const ( - usingFixture tpccSetupType = iota + usingImport tpccSetupType = iota usingInit ) @@ -63,10 +63,12 @@ type tpccOptions struct { Versions []string } -// tpccFixturesCmd generates the command string to load tpcc data for the +// tpccImportCmd generates the command string to load tpcc data for the // specified warehouse count into a cluster. -func tpccFixturesCmd(t *test, warehouses int, extraArgs string) string { - return fmt.Sprintf("./workload fixtures import tpcc --warehouses=%d %s {pgurl:1}", +func tpccImportCmd(t *test, warehouses int, extraArgs string) string { + // Use `cockroach workload` instead of `workload` so the tpcc + // workload-versions match on release branches. + return fmt.Sprintf("./cockroach workload fixtures import tpcc --warehouses=%d %s {pgurl:1}", warehouses, extraArgs) } @@ -111,21 +113,21 @@ func setupTPCC( c.Put(ctx, cockroach, "./cockroach", workloadNode) c.Put(ctx, workload, "./workload", workloadNode) - extraArgs := opts.ExtraSetupArgs - if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) { - extraArgs += " --deprecated-fk-indexes" - } func() { db := c.Conn(ctx, 1) defer db.Close() c.Start(ctx, t, crdbNodes, startArgsDontEncrypt) waitForFullReplication(t, c.Conn(ctx, crdbNodes[0])) switch opts.SetupType { - case usingFixture: + case usingImport: t.Status("loading fixture") - c.Run(ctx, workloadNode, tpccFixturesCmd(t, opts.Warehouses, extraArgs)) + c.Run(ctx, workloadNode, tpccImportCmd(t, opts.Warehouses, opts.ExtraSetupArgs)) case usingInit: t.Status("initializing tables") + extraArgs := opts.ExtraSetupArgs + if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) { + extraArgs += " --deprecated-fk-indexes" + } cmd := fmt.Sprintf( "./workload init tpcc --warehouses=%d %s {pgurl:1}", opts.Warehouses, extraArgs, @@ -234,7 +236,7 @@ func registerTPCC(r *testRegistry) { runTPCC(ctx, t, c, tpccOptions{ Warehouses: headroomWarehouses, Duration: 120 * time.Minute, - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -268,7 +270,7 @@ func registerTPCC(r *testRegistry) { Warehouses: headroomWarehouses, Duration: 120 * time.Minute, Versions: []string{oldV, "", oldV, ""}, - SetupType: usingFixture, + SetupType: usingImport, }) // TODO(tbg): run another TPCC with the final binaries here and // teach TPCC to re-use the dataset (seems easy enough) to at least @@ -285,7 +287,7 @@ func registerTPCC(r *testRegistry) { Warehouses: 1, Duration: 10 * time.Minute, ExtraRunArgs: "--wait=false", - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -301,7 +303,7 @@ func registerTPCC(r *testRegistry) { runTPCC(ctx, t, c, tpccOptions{ Warehouses: warehouses, Duration: 6 * 24 * time.Hour, - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -330,7 +332,7 @@ func registerTPCC(r *testRegistry) { DrainAndQuit: false, } }, - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -662,7 +664,7 @@ func loadTPCCBench( // Load the corresponding fixture. t.l.Printf("restoring tpcc fixture\n") waitForFullReplication(t, db) - cmd := tpccFixturesCmd(t, b.LoadWarehouses, loadArgs) + cmd := tpccImportCmd(t, b.LoadWarehouses, loadArgs) if err := c.RunE(ctx, loadNode, cmd); err != nil { return err }