Skip to content

Commit

Permalink
roachtest: use crdb's workload for fixtures import tpcc
Browse files Browse the repository at this point in the history
Fixes #55042.
Fixes #55041.
Fixes #55039.
Fixes #55038.
Fixes #55037.
Fixes #55036.
Fixes #55035.
Fixes #55033.
Fixes #55029.
Fixes #55024.
Fixes #55022.
Fixes #55020.
Fixes #55019.
Fixes #55018.
Fixes #55017.
Fixes #55016.
Fixes #55013.
Fixes #55010.
Fixes #55009.
Fixes #55008.
Fixes #55003.
Fixes #55002.
Fixes #54998.
Fixes #54995.
Fixes #54822.
Fixes #52693.
Fixes #54802.

We were already doing this in some places, but needed it in others.
  • Loading branch information
nvanbenschoten authored and jayshrivastava committed Oct 8, 2020
1 parent 06abb27 commit 9492aec
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 38 deletions.
17 changes: 5 additions & 12 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type cdcTestArgs struct {
kafkaChaos bool
crdbChaos bool
cloudStorageSink bool
fixturesImport bool

targetInitialScanLatency time.Duration
targetSteadyLatency time.Duration
Expand Down Expand Up @@ -101,7 +100,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
// value" errors #34025.
tpcc.tolerateErrors = true

tpcc.install(ctx, c, args.fixturesImport)
tpcc.install(ctx, c)
// TODO(dan,ajwerner): sleeping momentarily before running the workload
// mitigates errors like "error in newOrder: missing stock row" from tpcc.
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -593,7 +592,6 @@ func registerCDC(r *testRegistry) {
workloadDuration: "30m",
initialScan: true,
cloudStorageSink: true,
fixturesImport: true,
targetInitialScanLatency: 30 * time.Minute,
targetSteadyLatency: time.Minute,
})
Expand Down Expand Up @@ -733,16 +731,11 @@ type tpccWorkload struct {
tolerateErrors bool
}

func (tw *tpccWorkload) install(ctx context.Context, c *cluster, fixturesImport bool) {
command := `./workload fixtures load`
if fixturesImport {
// For fixtures import, use the version built into the cockroach binary so
// the tpcc workload-versions match on release branches.
command = `./cockroach workload fixtures import`
}
func (tw *tpccWorkload) install(ctx context.Context, c *cluster) {
// For fixtures import, use the version built into the cockroach binary so
// the tpcc workload-versions match on release branches.
c.Run(ctx, tw.workloadNodes, fmt.Sprintf(
`%s tpcc --warehouses=%d --checks=false {pgurl%s}`,
command,
`./cockroach workload fixtures import tpcc --warehouses=%d --checks=false {pgurl%s}`,
tw.tpccWarehouseCount,
tw.sqlNodes.randNode(),
))
Expand Down
7 changes: 1 addition & 6 deletions pkg/cmd/roachtest/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
)

Expand All @@ -36,11 +35,7 @@ func registerImportTPCC(r *testRegistry) {
hc := NewHealthChecker(c, c.All())
m.Go(hc.Runner)

workloadStr := `./workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'`
if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) {
workloadStr += " --deprecated-fk-indexes"
}

workloadStr := `./cockroach workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'`
m.Go(func(ctx context.Context) error {
defer dul.Done()
defer hc.Done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/overload_tpcc_olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type tpccOLAPSpec struct {
func (s tpccOLAPSpec) run(ctx context.Context, t *test, c *cluster) {
crdbNodes, workloadNode := setupTPCC(
ctx, t, c, tpccOptions{
Warehouses: s.Warehouses, SetupType: usingFixture,
Warehouses: s.Warehouses, SetupType: usingImport,
})
const queryFileName = "queries.sql"
// querybench expects the entire query to be on a single line.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func makeIndexAddTpccTest(spec clusterSpec, warehouses int, length time.Duration
})
},
Duration: length,
SetupType: usingFixture,
SetupType: usingImport,
})
},
MinVersion: "v19.1.0",
Expand Down Expand Up @@ -457,7 +457,7 @@ func makeSchemaChangeDuringTPCC(spec clusterSpec, warehouses int, length time.Du
})
},
Duration: length,
SetupType: usingFixture,
SetupType: usingImport,
})
},
MinVersion: "v19.1.0",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func makeScrubTPCCTest(
return nil
},
Duration: length,
SetupType: usingFixture,
SetupType: usingImport,
})
},
MinVersion: "v19.1.0",
Expand Down
34 changes: 18 additions & 16 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
type tpccSetupType int

const (
usingFixture tpccSetupType = iota
usingImport tpccSetupType = iota
usingInit
)

Expand All @@ -63,10 +63,12 @@ type tpccOptions struct {
Versions []string
}

// tpccFixturesCmd generates the command string to load tpcc data for the
// tpccImportCmd generates the command string to load tpcc data for the
// specified warehouse count into a cluster.
func tpccFixturesCmd(t *test, warehouses int, extraArgs string) string {
return fmt.Sprintf("./workload fixtures import tpcc --warehouses=%d %s {pgurl:1}",
func tpccImportCmd(t *test, warehouses int, extraArgs string) string {
// Use `cockroach workload` instead of `workload` so the tpcc
// workload-versions match on release branches.
return fmt.Sprintf("./cockroach workload fixtures import tpcc --warehouses=%d %s {pgurl:1}",
warehouses, extraArgs)
}

Expand Down Expand Up @@ -111,21 +113,21 @@ func setupTPCC(
c.Put(ctx, cockroach, "./cockroach", workloadNode)
c.Put(ctx, workload, "./workload", workloadNode)

extraArgs := opts.ExtraSetupArgs
if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) {
extraArgs += " --deprecated-fk-indexes"
}
func() {
db := c.Conn(ctx, 1)
defer db.Close()
c.Start(ctx, t, crdbNodes, startArgsDontEncrypt)
waitForFullReplication(t, c.Conn(ctx, crdbNodes[0]))
switch opts.SetupType {
case usingFixture:
case usingImport:
t.Status("loading fixture")
c.Run(ctx, workloadNode, tpccFixturesCmd(t, opts.Warehouses, extraArgs))
c.Run(ctx, workloadNode, tpccImportCmd(t, opts.Warehouses, opts.ExtraSetupArgs))
case usingInit:
t.Status("initializing tables")
extraArgs := opts.ExtraSetupArgs
if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) {
extraArgs += " --deprecated-fk-indexes"
}
cmd := fmt.Sprintf(
"./workload init tpcc --warehouses=%d %s {pgurl:1}",
opts.Warehouses, extraArgs,
Expand Down Expand Up @@ -234,7 +236,7 @@ func registerTPCC(r *testRegistry) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: headroomWarehouses,
Duration: 120 * time.Minute,
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand Down Expand Up @@ -268,7 +270,7 @@ func registerTPCC(r *testRegistry) {
Warehouses: headroomWarehouses,
Duration: 120 * time.Minute,
Versions: []string{oldV, "", oldV, ""},
SetupType: usingFixture,
SetupType: usingImport,
})
// TODO(tbg): run another TPCC with the final binaries here and
// teach TPCC to re-use the dataset (seems easy enough) to at least
Expand All @@ -285,7 +287,7 @@ func registerTPCC(r *testRegistry) {
Warehouses: 1,
Duration: 10 * time.Minute,
ExtraRunArgs: "--wait=false",
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand All @@ -301,7 +303,7 @@ func registerTPCC(r *testRegistry) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: warehouses,
Duration: 6 * 24 * time.Hour,
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand Down Expand Up @@ -330,7 +332,7 @@ func registerTPCC(r *testRegistry) {
DrainAndQuit: false,
}
},
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand Down Expand Up @@ -662,7 +664,7 @@ func loadTPCCBench(
// Load the corresponding fixture.
t.l.Printf("restoring tpcc fixture\n")
waitForFullReplication(t, db)
cmd := tpccFixturesCmd(t, b.LoadWarehouses, loadArgs)
cmd := tpccImportCmd(t, b.LoadWarehouses, loadArgs)
if err := c.RunE(ctx, loadNode, cmd); err != nil {
return err
}
Expand Down

0 comments on commit 9492aec

Please sign in to comment.