Skip to content

Commit

Permalink
c2c: deflake c2c/shutdown roachtests
Browse files Browse the repository at this point in the history
This patch addresses to roachtest failure modes:
- Prevents roachtest failure if a query fails during a node shutdown.
- Prevents the src cluster from returning a single node topology, which could
	cause the stream ingestion job to hang if the participating src node gets
shut down. Longer term, automatic replanning will prevent this.

Fixes cockroachdb#101898
Fixes cockroachdb#102111

Release note: None
  • Loading branch information
msbutler committed Apr 28, 2023
1 parent a10740b commit ad9c15d
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
WithNodeExporter(rd.setup.dst.nodes.InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard")

require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg))
// StartGrafana clutters the test.log. Try logging setup to a separate file.
promLog, err := rd.t.L().ChildLogger("prom_setup")
if err != nil {
promLog = rd.t.L()
}
require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg))
rd.t.L().Printf("Prom has started")
}
}
Expand Down Expand Up @@ -527,13 +532,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time {
return retainedTime
}

func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) {
func (rd *replicationDriver) stopReplicationStream(
ctx context.Context, ingestionJob int, cutoverTime time.Time,
) {
rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime)
err := retry.ForDuration(time.Minute*5, func() error {
var status string
var payloadBytes []byte
rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`,
ingestionJob).Scan(&status, &payloadBytes)
res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob)
if res.Err() != nil {
// This query can fail if a node shuts down during the query execution;
// therefore, tolerate errors.
return res.Err()
}
require.NoError(rd.t, res.Scan(&status, &payloadBytes))
if jobs.Status(status) == jobs.StatusFailed {
payload := &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, payload); err == nil {
Expand Down Expand Up @@ -593,6 +605,14 @@ func (rd *replicationDriver) main(ctx context.Context) {
metricSnapper := rd.startStatsCollection(ctx)
rd.preStreamingWorkload(ctx)

// Wait for initial workload to be properly replicated across the source cluster to increase
// the probability that the producer returns a topology with more than one node in it,
// else the node shutdown tests can flake.
// See https://github.com/cockroachdb/cockroach/issues/101898.
if rd.rs.srcNodes >= 3 {
require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db))
}

rd.t.L().Printf("begin workload on src cluster")
m := rd.newMonitor(ctx)
// The roachtest driver can use the workloadCtx to cancel the workload.
Expand All @@ -617,10 +637,9 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.t.Status("starting replication stream")
rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now())
ingestionJobID := rd.startReplicationStream(ctx)

removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name)

lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false)
lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, true)
defer lv.maybeLogLatencyHist()

m.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -655,7 +674,7 @@ func (rd *replicationDriver) main(ctx context.Context) {

rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s",
cutoverTime.String()))
rd.stopReplicationStream(ingestionJobID, cutoverTime)
rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime)
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.metrics.export(rd.t, len(rd.setup.src.nodes))
Expand Down Expand Up @@ -963,7 +982,10 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error {
// Every C2C phase should last at least 30 seconds, so introduce a little
// bit of random waiting before node shutdown to ensure the shutdown occurs
// once we're settled into the target phase.
randomSleep := time.Duration(rand.Intn(6))
//
// Sleep for a minimum of 5 seconds to ensure the stream processors have
// observed the cutover signal.
randomSleep := time.Duration(5 + rand.Intn(6))
rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep)
time.Sleep(randomSleep * time.Second)
return nil
Expand Down Expand Up @@ -1054,6 +1076,9 @@ func registerClusterReplicationResilience(r registry.Registry) {
// Don't begin shutdown process until c2c job is set up.
<-shutdownSetupDone

// Eagerly listen to cutover signal to exercise node shutdown during actual cutover.
rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`)

// While executing a node shutdown on either the src or destination
// cluster, ensure the destination cluster's stream ingestion job
// completes. If the stream producer job fails, no big deal-- in a real
Expand Down

0 comments on commit ad9c15d

Please sign in to comment.