diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index 5d06a68fa737..b7cedb55633c 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -59,6 +59,7 @@ func registerTests(r *registry) { registerRestore(r) registerRoachmart(r) registerScaleData(r) + registerSchemaChangeBulkIngest(r) registerSchemaChangeCancelIndexTPCC1000(r) registerSchemaChangeKV(r) registerSchemaChangeIndexTPCC100(r) diff --git a/pkg/cmd/roachtest/schemachange.go b/pkg/cmd/roachtest/schemachange.go index 03c3130695de..e77f14cc2610 100644 --- a/pkg/cmd/roachtest/schemachange.go +++ b/pkg/cmd/roachtest/schemachange.go @@ -436,6 +436,85 @@ func makeIndexAddRollbackTpccTest(numNodes, warehouses int, length time.Duration } } +func registerSchemaChangeBulkIngest(r *registry) { + r.Add(makeSchemaChangeBulkIngestTest(5, 100000000, time.Minute*20)) +} + +func makeSchemaChangeBulkIngestTest(numNodes, numRows int, length time.Duration) testSpec { + return testSpec{ + Name: "schemachange/bulkingest", + Cluster: makeClusterSpec(numNodes), + Timeout: length * 2, + Run: func(ctx context.Context, t *test, c *cluster) { + // Configure column a to have sequential ascending values, and columns b and c to be constant. + // The payload column will be randomized and thus uncorrelated with the primary key (a, b, c). + aNum := numRows + if c.isLocal() { + aNum = 100000 + } + bNum := 1 + cNum := 1 + payloadBytes := 4 + + crdbNodes := c.Range(1, c.nodes-1) + workloadNode := c.Node(c.nodes) + + c.Put(ctx, cockroach, "./cockroach", crdbNodes) + c.Put(ctx, workload, "./workload", workloadNode) + // TODO (lucy): Remove flag once the faster import is enabled by default + c.Start(ctx, t, crdbNodes, startArgs("--env=COCKROACH_IMPORT_WORKLOAD_FASTER=true")) + + // Don't add another index when importing. + cmdWrite := fmt.Sprintf( + "./workload fixtures import bulkingest {pgurl:1} --a %d --b %d --c %d --payload-bytes %d --index-b-c-a=false", + aNum, bNum, cNum, payloadBytes, + ) + + c.Run(ctx, workloadNode, cmdWrite) + + m := newMonitor(ctx, c, crdbNodes) + + indexDuration := length + if c.isLocal() { + indexDuration = time.Second * 30 + } + cmdWriteAndRead := fmt.Sprintf( + "./workload run bulkingest --duration %s {pgurl:1-%d} --a %d --b %d --c %d --payload-bytes %d", + indexDuration.String(), c.nodes-1, aNum, bNum, cNum, payloadBytes, + ) + m.Go(func(ctx context.Context) error { + c.Run(ctx, workloadNode, cmdWriteAndRead) + return nil + }) + + m.Go(func(ctx context.Context) error { + db := c.Conn(ctx, 1) + defer db.Close() + + if !c.isLocal() { + // Wait for the load generator to run for a few minutes before creating the index. + sleepInterval := time.Minute * 5 + maxSleep := length / 2 + if sleepInterval > maxSleep { + sleepInterval = maxSleep + } + time.Sleep(sleepInterval) + } + + c.l.Printf("Creating index") + before := timeutil.Now() + if _, err := db.Exec(`CREATE INDEX payload_a ON bulkingest.bulkingest (payload, a)`); err != nil { + t.Fatal(err) + } + c.l.Printf("CREATE INDEX took %v\n", timeutil.Since(before)) + return nil + }) + + m.Wait() + }, + } +} + func runAndLogStmts(ctx context.Context, t *test, c *cluster, prefix string, stmts []string) error { db := c.Conn(ctx, 1) defer db.Close()