diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index f8eea8294209..ae100da0798a 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -428,7 +428,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste WithNodeExporter(rd.setup.dst.nodes.InstallNodes()). WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg)) + // StartGrafana clutters the test.log. Try logging setup to a separate file. + promLog, err := rd.t.L().ChildLogger("prom_setup") + if err != nil { + promLog = rd.t.L() + } + require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg)) rd.t.L().Printf("Prom has started") } } @@ -527,13 +532,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time { return retainedTime } -func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { +func (rd *replicationDriver) stopReplicationStream( + ctx context.Context, ingestionJob int, cutoverTime time.Time, +) { rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime) err := retry.ForDuration(time.Minute*5, func() error { var status string var payloadBytes []byte - rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, - ingestionJob).Scan(&status, &payloadBytes) + res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob) + if res.Err() != nil { + // This query can fail if a node shuts down during the query execution; + // therefore, tolerate errors. + return res.Err() + } + require.NoError(rd.t, res.Scan(&status, &payloadBytes)) if jobs.Status(status) == jobs.StatusFailed { payload := &jobspb.Payload{} if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { @@ -593,6 +605,14 @@ func (rd *replicationDriver) main(ctx context.Context) { metricSnapper := rd.startStatsCollection(ctx) rd.preStreamingWorkload(ctx) + // Wait for initial workload to be properly replicated across the source cluster to increase + // the probability that the producer returns a topology with more than one node in it, + // else the node shutdown tests can flake. + // See https://github.com/cockroachdb/cockroach/issues/101898. + if rd.rs.srcNodes >= 3 { + require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db)) + } + rd.t.L().Printf("begin workload on src cluster") m := rd.newMonitor(ctx) // The roachtest driver can use the workloadCtx to cancel the workload. @@ -617,10 +637,9 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status("starting replication stream") rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) ingestionJobID := rd.startReplicationStream(ctx) - removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name) - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false) + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, true) defer lv.maybeLogLatencyHist() m.Go(func(ctx context.Context) error { @@ -655,7 +674,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - rd.stopReplicationStream(ingestionJobID, cutoverTime) + rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime) rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -963,7 +982,10 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { // Every C2C phase should last at least 30 seconds, so introduce a little // bit of random waiting before node shutdown to ensure the shutdown occurs // once we're settled into the target phase. - randomSleep := time.Duration(rand.Intn(6)) + // + // Sleep for a minimum of 5 seconds to ensure the stream processors have + // observed the cutover signal. + randomSleep := time.Duration(5 + rand.Intn(6)) rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep) time.Sleep(randomSleep * time.Second) return nil @@ -1054,6 +1076,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // Don't begin shutdown process until c2c job is set up. <-shutdownSetupDone + // Eagerly listen to cutover signal to exercise node shutdown during actual cutover. + rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`) + // While executing a node shutdown on either the src or destination // cluster, ensure the destination cluster's stream ingestion job // completes. If the stream producer job fails, no big deal-- in a real