diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index dbb3d46cedf0..7879ce03f36d 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -264,7 +264,7 @@ func alterTenantJobCutover( cutoverTime, record.Timestamp) } } - if err := completeStreamIngestion(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { + if err := applyCutoverTime(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { return hlc.Timestamp{}, err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 4755b67c79af..dccdd0d10932 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -37,7 +37,7 @@ type streamIngestManagerImpl struct { func (r *streamIngestManagerImpl) CompleteStreamIngestion( ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp, ) error { - return completeStreamIngestion(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) + return applyCutoverTime(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) } // GetStreamIngestionStats implements streaming.StreamIngestManager interface. diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 22d85985ecf2..11417b83ccc5 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -38,8 +38,9 @@ import ( "github.com/cockroachdb/errors" ) -// completeStreamIngestion terminates the stream as of specified time. -func completeStreamIngestion( +// applyCutoverTime modifies the consumer job record with a cutover time and +// unpauses the job if necessary. +func applyCutoverTime( ctx context.Context, jobRegistry *jobs.Registry, txn isql.Txn, diff --git a/pkg/cmd/roachtest/option/node_list_option.go b/pkg/cmd/roachtest/option/node_list_option.go index 475d78d2c956..5535a7eea2dd 100644 --- a/pkg/cmd/roachtest/option/node_list_option.go +++ b/pkg/cmd/roachtest/option/node_list_option.go @@ -73,6 +73,11 @@ func (n NodeListOption) RandNode() NodeListOption { return NodeListOption{n[rand.Intn(len(n))]} } +// SeededRandNode returns a random node from the NodeListOption using a seeded rand object. +func (n NodeListOption) SeededRandNode(rand *rand.Rand) NodeListOption { + return NodeListOption{n[rand.Intn(len(n))]} +} + // NodeIDsString returns the nodes in the NodeListOption, separated by spaces. func (n NodeListOption) NodeIDsString() string { result := "" diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index f8eea8294209..e87a73bbdf45 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -34,11 +34,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -228,33 +230,33 @@ type replicateKV struct { readPercent int // This field is merely used to debug the c2c framework for finite workloads. - debugRunDurationMinutes int + debugRunDuration time.Duration - // initDuration, if nonzero, will pre-populate the src cluster - initDurationMinutes int + // the number of rows inserted into the cluster before c2c begins + initRows int + + // max size of raw data written during each insertion + maxBlockBytes int } func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { - if kv.initDurationMinutes == 0 { - return "" - } - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`, - kv.initDurationMinutes, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload init kv`). + MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows). + MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes). + Flag("splits", 100). + Option("scatter"). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { - debugDuration := "" - if kv.debugRunDurationMinutes != 0 { - debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes) - } - // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`, - debugDuration, - kv.readPercent, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload run kv`). + Option("tolerate-errors"). + Flag("max-block-bytes", kv.maxBlockBytes). + Flag("read-percent", kv.readPercent). + MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) runDriver( @@ -338,13 +340,17 @@ type replicationDriver struct { t test.Test c cluster.Cluster metrics *c2cMetrics + rng *rand.Rand } func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver { + rng, seed := randutil.NewTestRand() + t.L().Printf(`Random Seed is %d`, seed) return replicationDriver{ - t: t, - c: c, - rs: rs, + t: t, + c: c, + rs: rs, + rng: rng, } } @@ -367,8 +373,8 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) - srcNode := srcCluster.RandNode() - destNode := dstCluster.RandNode() + srcNode := srcCluster.SeededRandNode(rd.rng) + destNode := dstCluster.SeededRandNode(rd.rng) addr, err := c.ExternalPGUrl(ctx, t.L(), srcNode, "") require.NoError(t, err) @@ -428,7 +434,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste WithNodeExporter(rd.setup.dst.nodes.InstallNodes()). WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg)) + // StartGrafana clutters the test.log. Try logging setup to a separate file. + promLog, err := rd.t.L().ChildLogger("prom_setup", logger.QuietStderr, logger.QuietStdout) + if err != nil { + promLog = rd.t.L() + } + require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg)) rd.t.L().Printf("Prom has started") } } @@ -527,13 +538,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time { return retainedTime } -func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { +func (rd *replicationDriver) stopReplicationStream( + ctx context.Context, ingestionJob int, cutoverTime time.Time, +) { rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime) err := retry.ForDuration(time.Minute*5, func() error { var status string var payloadBytes []byte - rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, - ingestionJob).Scan(&status, &payloadBytes) + res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob) + if res.Err() != nil { + // This query can fail if a node shuts down during the query execution; + // therefore, tolerate errors. + return res.Err() + } + require.NoError(rd.t, res.Scan(&status, &payloadBytes)) if jobs.Status(status) == jobs.StatusFailed { payload := &jobspb.Payload{} if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { @@ -593,6 +611,13 @@ func (rd *replicationDriver) main(ctx context.Context) { metricSnapper := rd.startStatsCollection(ctx) rd.preStreamingWorkload(ctx) + // Wait for initial workload to be properly replicated across the source cluster to increase + // the probability that the producer returns a topology with more than one node in it, + // else the node shutdown tests can flake. + if rd.rs.srcNodes >= 3 { + require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db)) + } + rd.t.L().Printf("begin workload on src cluster") m := rd.newMonitor(ctx) // The roachtest driver can use the workloadCtx to cancel the workload. @@ -617,10 +642,12 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status("starting replication stream") rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) ingestionJobID := rd.startReplicationStream(ctx) - removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name) - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false) + // latency verifier queries may error during a node shutdown event; therefore + // tolerate errors if we anticipate node deaths. + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), + getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0) defer lv.maybeLogLatencyHist() m.Go(func(ctx context.Context) error { @@ -655,7 +682,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - rd.stopReplicationStream(ingestionJobID, cutoverTime) + rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime) rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -704,7 +731,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. timeout: 10 * time.Minute, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1}, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, } @@ -751,19 +778,20 @@ func registerClusterToCluster(r registry.Registry) { dstNodes: 3, cpus: 8, pdSize: 100, - workload: replicateKV{readPercent: 0}, + workload: replicateKV{readPercent: 0, maxBlockBytes: 1024}, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, tags: registry.Tags("aws"), }, { - name: "c2c/UnitTest", - srcNodes: 1, - dstNodes: 1, - cpus: 4, - pdSize: 10, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + name: "c2c/UnitTest", + srcNodes: 1, + dstNodes: 1, + cpus: 4, + pdSize: 10, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, + maxBlockBytes: 1024}, timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, @@ -868,7 +896,7 @@ func makeReplResilienceDriver( rd := makeReplicationDriver(t, c, rsp.replicationSpec) return replResilienceDriver{ replicationDriver: rd, - phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)), + phase: c2cPhase(rd.rng.Intn(int(phaseCutover) + 1)), rsp: rsp, } } @@ -923,7 +951,7 @@ func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) { findAnotherNode := func(notThisNode int) int { for { - anotherNode := nodes.RandNode()[0] + anotherNode := nodes.SeededRandNode(rrd.rng)[0] if notThisNode != anotherNode { return anotherNode } @@ -960,12 +988,7 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { case currentPhase < rrd.phase: time.Sleep(5 * time.Second) case currentPhase == rrd.phase: - // Every C2C phase should last at least 30 seconds, so introduce a little - // bit of random waiting before node shutdown to ensure the shutdown occurs - // once we're settled into the target phase. - randomSleep := time.Duration(rand.Intn(6)) - rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep) - time.Sleep(randomSleep * time.Second) + rrd.t.L().Printf("In target phase %s", currentPhase.String()) return nil default: return errors.New("c2c job past target phase") @@ -973,6 +996,16 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { } } +func (rrd *replResilienceDriver) sleepBeforeResiliencyEvent() { + // Assuming every C2C phase lasts at least 10 seconds, introduce some waiting + // before a resiliency event (e.g. a node shutdown) to ensure the event occurs + // once we're fully settled into the target phase (e.g. the stream ingestion + // processors have observed the cutover signal). + randomSleep := time.Duration(5+rrd.rng.Intn(6)) * time.Second + rrd.t.L().Printf("Take a %s power nap", randomSleep) + time.Sleep(randomSleep) +} + func registerClusterReplicationResilience(r registry.Registry) { for _, rsp := range []replResilienceSpec{ { @@ -999,7 +1032,7 @@ func registerClusterReplicationResilience(r registry.Registry) { srcNodes: 4, dstNodes: 4, cpus: 8, - workload: replicateKV{readPercent: 0, initDurationMinutes: 2}, + workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024}, timeout: 20 * time.Minute, additionalDuration: 6 * time.Minute, cutover: 3 * time.Minute, @@ -1054,6 +1087,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // Don't begin shutdown process until c2c job is set up. <-shutdownSetupDone + // Eagerly listen to cutover signal to exercise node shutdown during actual cutover. + rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`) + // While executing a node shutdown on either the src or destination // cluster, ensure the destination cluster's stream ingestion job // completes. If the stream producer job fails, no big deal-- in a real @@ -1061,7 +1097,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // successful c2c replication execution. shutdownStarter := func() jobStarter { return func(c cluster.Cluster, t test.Test) (string, error) { - return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase() + require.NoError(t, rrd.waitForTargetPhase()) + rrd.sleepBeforeResiliencyEvent() + return fmt.Sprintf("%d", rrd.dstJobID), nil } } destinationWatcherNode := rrd.watcherNode diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 2ca12644bcbe..7c5df2dc16a3 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -338,6 +338,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName) removeTenantRateLimiters(t, sysSQL, tenantName) diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 84007fba2733..3eec400c98c0 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -88,6 +88,7 @@ type kv struct { zipfian bool sfuDelay time.Duration splits int + scatter bool secondaryIndex bool shards int targetCompressionRatio float64 @@ -127,6 +128,7 @@ var kvMeta = workload.Meta{ `span-limit`: {RuntimeOnly: true}, `del-percent`: {RuntimeOnly: true}, `splits`: {RuntimeOnly: true}, + `scatter`: {RuntimeOnly: true}, `timeout`: {RuntimeOnly: true}, } g.flags.IntVar(&g.batchSize, `batch`, 1, @@ -157,6 +159,8 @@ var kvMeta = workload.Meta{ `previous --sequential run and R implies a previous random run.`) g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations.`) + g.flags.BoolVar(&g.scatter, `scatter`, false, + `Scatter ranges before starting normal operations.`) g.flags.BoolVar(&g.secondaryIndex, `secondary-index`, false, `Add a secondary index to the schema.`) g.flags.IntVar(&g.shards, `num-shards`, 0, @@ -189,13 +193,20 @@ func (w *kv) Flags() workload.Flags { return w.flags } func (w *kv) Hooks() workload.Hooks { return workload.Hooks{ PostLoad: func(_ context.Context, db *gosql.DB) error { - if !w.enum { - return nil - } - _, err := db.Exec(` + if w.enum { + _, err := db.Exec(` CREATE TYPE enum_type AS ENUM ('v'); ALTER TABLE kv ADD COLUMN e enum_type NOT NULL AS ('v') STORED;`) - return err + if err != nil { + return err + } + } + if w.scatter { + if _, err := db.Exec(`ALTER TABLE kv SCATTER`); err != nil { + return err + } + } + return nil }, Validate: w.validateConfig, }