Skip to content

Commit

Permalink
c2c: deflake c2c/shutdown roachtests
Browse files Browse the repository at this point in the history
This patch addresses to roachtest failure modes:
- Prevents roachtest failure if a query fails during a node shutdown.

- Prevents the src cluster from returning a single node topology, which could
  cause the stream ingestion job to hang if the participating src node gets
shut down. Longer term, automatic replanning will prevent this. Specifically,
this patch changes the kv workload to split and scatter the kv table across the
cluster before the c2c job begins.

Fixes #101898
Fixes #102111

This patch also makes it easier to reproduce c2c roachtest failures by plumbing
a random seed to several components of the roachtest driver.

Release note: None
  • Loading branch information
msbutler committed May 11, 2023
1 parent 91d8022 commit 093e2dd
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 50 deletions.
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/option/node_list_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (n NodeListOption) RandNode() NodeListOption {
return NodeListOption{n[rand.Intn(len(n))]}
}

// SeededRandNode returns a random node from the NodeListOption using a seeded rand object.
func (n NodeListOption) SeededRandNode(rand *rand.Rand) NodeListOption {
return NodeListOption{n[rand.Intn(len(n))]}
}

// NodeIDsString returns the nodes in the NodeListOption, separated by spaces.
func (n NodeListOption) NodeIDsString() string {
result := ""
Expand Down
138 changes: 88 additions & 50 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -228,33 +230,33 @@ type replicateKV struct {
readPercent int

// This field is merely used to debug the c2c framework for finite workloads.
debugRunDurationMinutes int
debugRunDuration time.Duration

// initDuration, if nonzero, will pre-populate the src cluster
initDurationMinutes int
// the number of rows inserted into the cluster before c2c begins
initRows int

// max size of raw data written during each insertion
maxBlockBytes int
}

func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
if kv.initDurationMinutes == 0 {
return ""
}
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`,
kv.initDurationMinutes,
nodes,
tenantName)
cmd := roachtestutil.NewCommand(`./workload init kv`).
MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows).
MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes).
Flag("splits", 100).
Option("scatter").
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}

func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string {
debugDuration := ""
if kv.debugRunDurationMinutes != 0 {
debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes)
}
// added --tolerate-errors flags to prevent test from flaking due to a transaction retry error
return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`,
debugDuration,
kv.readPercent,
nodes,
tenantName)
cmd := roachtestutil.NewCommand(`./workload run kv`).
Option("tolerate-errors").
Flag("max-block-bytes", kv.maxBlockBytes).
Flag("read-percent", kv.readPercent).
MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration).
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}

func (kv replicateKV) runDriver(
Expand Down Expand Up @@ -338,13 +340,17 @@ type replicationDriver struct {
t test.Test
c cluster.Cluster
metrics *c2cMetrics
rng *rand.Rand
}

func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver {
rng, seed := randutil.NewTestRand()
t.L().Printf(`Random Seed is %d`, seed)
return replicationDriver{
t: t,
c: c,
rs: rs,
t: t,
c: c,
rs: rs,
rng: rng,
}
}

Expand All @@ -367,8 +373,8 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true))
c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster)

srcNode := srcCluster.RandNode()
destNode := dstCluster.RandNode()
srcNode := srcCluster.SeededRandNode(rd.rng)
destNode := dstCluster.SeededRandNode(rd.rng)

addr, err := c.ExternalPGUrl(ctx, t.L(), srcNode, "")
require.NoError(t, err)
Expand Down Expand Up @@ -428,7 +434,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
WithNodeExporter(rd.setup.dst.nodes.InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard")

require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg))
// StartGrafana clutters the test.log. Try logging setup to a separate file.
promLog, err := rd.t.L().ChildLogger("prom_setup", logger.QuietStderr, logger.QuietStdout)
if err != nil {
promLog = rd.t.L()
}
require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg))
rd.t.L().Printf("Prom has started")
}
}
Expand Down Expand Up @@ -527,13 +538,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time {
return retainedTime
}

func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) {
func (rd *replicationDriver) stopReplicationStream(
ctx context.Context, ingestionJob int, cutoverTime time.Time,
) {
rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime)
err := retry.ForDuration(time.Minute*5, func() error {
var status string
var payloadBytes []byte
rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`,
ingestionJob).Scan(&status, &payloadBytes)
res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob)
if res.Err() != nil {
// This query can fail if a node shuts down during the query execution;
// therefore, tolerate errors.
return res.Err()
}
require.NoError(rd.t, res.Scan(&status, &payloadBytes))
if jobs.Status(status) == jobs.StatusFailed {
payload := &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, payload); err == nil {
Expand Down Expand Up @@ -593,6 +611,13 @@ func (rd *replicationDriver) main(ctx context.Context) {
metricSnapper := rd.startStatsCollection(ctx)
rd.preStreamingWorkload(ctx)

// Wait for initial workload to be properly replicated across the source cluster to increase
// the probability that the producer returns a topology with more than one node in it,
// else the node shutdown tests can flake.
if rd.rs.srcNodes >= 3 {
require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db))
}

rd.t.L().Printf("begin workload on src cluster")
m := rd.newMonitor(ctx)
// The roachtest driver can use the workloadCtx to cancel the workload.
Expand All @@ -617,10 +642,12 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.t.Status("starting replication stream")
rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now())
ingestionJobID := rd.startReplicationStream(ctx)

removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name)

lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false)
// latency verifier queries may error during a node shutdown event; therefore
// tolerate errors if we anticipate node deaths.
lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(),
getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0)
defer lv.maybeLogLatencyHist()

m.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -655,7 +682,7 @@ func (rd *replicationDriver) main(ctx context.Context) {

rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s",
cutoverTime.String()))
rd.stopReplicationStream(ingestionJobID, cutoverTime)
rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime)
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.metrics.export(rd.t, len(rd.setup.src.nodes))
Expand Down Expand Up @@ -704,7 +731,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster
dstNodes: 1,
// The timeout field ensures the c2c roachtest driver behaves properly.
timeout: 10 * time.Minute,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1},
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
}
Expand Down Expand Up @@ -751,19 +778,20 @@ func registerClusterToCluster(r registry.Registry) {
dstNodes: 3,
cpus: 8,
pdSize: 100,
workload: replicateKV{readPercent: 0},
workload: replicateKV{readPercent: 0, maxBlockBytes: 1024},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
tags: registry.Tags("aws"),
},
{
name: "c2c/UnitTest",
srcNodes: 1,
dstNodes: 1,
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
name: "c2c/UnitTest",
srcNodes: 1,
dstNodes: 1,
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute,
maxBlockBytes: 1024},
timeout: 5 * time.Minute,
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
Expand Down Expand Up @@ -868,7 +896,7 @@ func makeReplResilienceDriver(
rd := makeReplicationDriver(t, c, rsp.replicationSpec)
return replResilienceDriver{
replicationDriver: rd,
phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)),
phase: c2cPhase(rd.rng.Intn(int(phaseCutover) + 1)),
rsp: rsp,
}
}
Expand Down Expand Up @@ -923,7 +951,7 @@ func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) {

findAnotherNode := func(notThisNode int) int {
for {
anotherNode := nodes.RandNode()[0]
anotherNode := nodes.SeededRandNode(rrd.rng)[0]
if notThisNode != anotherNode {
return anotherNode
}
Expand Down Expand Up @@ -960,19 +988,24 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error {
case currentPhase < rrd.phase:
time.Sleep(5 * time.Second)
case currentPhase == rrd.phase:
// Every C2C phase should last at least 30 seconds, so introduce a little
// bit of random waiting before node shutdown to ensure the shutdown occurs
// once we're settled into the target phase.
randomSleep := time.Duration(rand.Intn(6))
rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep)
time.Sleep(randomSleep * time.Second)
rrd.t.L().Printf("In target phase %s", currentPhase.String())
return nil
default:
return errors.New("c2c job past target phase")
}
}
}

func (rrd *replResilienceDriver) sleepBeforeResiliencyEvent() {
// Assuming every C2C phase lasts at least 10 seconds, introduce some waiting
// before a resiliency event (e.g. a node shutdown) to ensure the event occurs
// once we're fully settled into the target phase (e.g. the stream ingestion
// processors have observed the cutover signal).
randomSleep := time.Duration(5+rrd.rng.Intn(6)) * time.Second
rrd.t.L().Printf("Take a %s power nap", randomSleep)
time.Sleep(randomSleep)
}

func registerClusterReplicationResilience(r registry.Registry) {
for _, rsp := range []replResilienceSpec{
{
Expand All @@ -999,7 +1032,7 @@ func registerClusterReplicationResilience(r registry.Registry) {
srcNodes: 4,
dstNodes: 4,
cpus: 8,
workload: replicateKV{readPercent: 0, initDurationMinutes: 2},
workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024},
timeout: 20 * time.Minute,
additionalDuration: 6 * time.Minute,
cutover: 3 * time.Minute,
Expand Down Expand Up @@ -1054,14 +1087,19 @@ func registerClusterReplicationResilience(r registry.Registry) {
// Don't begin shutdown process until c2c job is set up.
<-shutdownSetupDone

// Eagerly listen to cutover signal to exercise node shutdown during actual cutover.
rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`)

// While executing a node shutdown on either the src or destination
// cluster, ensure the destination cluster's stream ingestion job
// completes. If the stream producer job fails, no big deal-- in a real
// DR scenario the src cluster may have gone belly up during a
// successful c2c replication execution.
shutdownStarter := func() jobStarter {
return func(c cluster.Cluster, t test.Test) (string, error) {
return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase()
require.NoError(t, rrd.waitForTargetPhase())
rrd.sleepBeforeResiliencyEvent()
return fmt.Sprintf("%d", rrd.dstJobID), nil
}
}
destinationWatcherNode := rrd.watcherNode
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func createInMemoryTenant(
sysSQL.Exec(t, "CREATE TENANT $1", tenantName)
sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName)

removeTenantRateLimiters(t, sysSQL, tenantName)

Expand Down

0 comments on commit 093e2dd

Please sign in to comment.