Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c2c: deflake c2c/shutdown roachtests #102382

Merged
merged 3 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func alterTenantJobCutover(
cutoverTime, record.Timestamp)
}
}
if err := completeStreamIngestion(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil {
if err := applyCutoverTime(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil {
return hlc.Timestamp{}, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type streamIngestManagerImpl struct {
func (r *streamIngestManagerImpl) CompleteStreamIngestion(
ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp,
) error {
return completeStreamIngestion(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp)
return applyCutoverTime(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp)
}

// GetStreamIngestionStats implements streaming.StreamIngestManager interface.
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
"github.com/cockroachdb/errors"
)

// completeStreamIngestion terminates the stream as of specified time.
func completeStreamIngestion(
// applyCutoverTime modifies the consumer job record with a cutover time and
// unpauses the job if necessary.
func applyCutoverTime(
ctx context.Context,
jobRegistry *jobs.Registry,
txn isql.Txn,
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/option/node_list_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (n NodeListOption) RandNode() NodeListOption {
return NodeListOption{n[rand.Intn(len(n))]}
}

// SeededRandNode returns a random node from the NodeListOption using a seeded rand object.
func (n NodeListOption) SeededRandNode(rand *rand.Rand) NodeListOption {
return NodeListOption{n[rand.Intn(len(n))]}
}

// NodeIDsString returns the nodes in the NodeListOption, separated by spaces.
func (n NodeListOption) NodeIDsString() string {
result := ""
Expand Down
138 changes: 88 additions & 50 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -228,33 +230,33 @@ type replicateKV struct {
readPercent int

// This field is merely used to debug the c2c framework for finite workloads.
debugRunDurationMinutes int
debugRunDuration time.Duration

// initDuration, if nonzero, will pre-populate the src cluster
initDurationMinutes int
// the number of rows inserted into the cluster before c2c begins
initRows int

// max size of raw data written during each insertion
maxBlockBytes int
}

func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
if kv.initDurationMinutes == 0 {
return ""
}
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`,
kv.initDurationMinutes,
nodes,
tenantName)
cmd := roachtestutil.NewCommand(`./workload init kv`).
MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows).
MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes).
Flag("splits", 100).
Option("scatter").
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}

func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string {
debugDuration := ""
if kv.debugRunDurationMinutes != 0 {
debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes)
}
// added --tolerate-errors flags to prevent test from flaking due to a transaction retry error
return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`,
debugDuration,
kv.readPercent,
nodes,
tenantName)
cmd := roachtestutil.NewCommand(`./workload run kv`).
Option("tolerate-errors").
Flag("max-block-bytes", kv.maxBlockBytes).
Flag("read-percent", kv.readPercent).
MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration).
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}

func (kv replicateKV) runDriver(
Expand Down Expand Up @@ -338,13 +340,17 @@ type replicationDriver struct {
t test.Test
c cluster.Cluster
metrics *c2cMetrics
rng *rand.Rand
}

func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver {
rng, seed := randutil.NewTestRand()
t.L().Printf(`Random Seed is %d`, seed)
return replicationDriver{
t: t,
c: c,
rs: rs,
t: t,
c: c,
rs: rs,
rng: rng,
}
}

Expand All @@ -367,8 +373,8 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true))
c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster)

srcNode := srcCluster.RandNode()
destNode := dstCluster.RandNode()
srcNode := srcCluster.SeededRandNode(rd.rng)
destNode := dstCluster.SeededRandNode(rd.rng)

addr, err := c.ExternalPGUrl(ctx, t.L(), srcNode, "")
require.NoError(t, err)
Expand Down Expand Up @@ -428,7 +434,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
WithNodeExporter(rd.setup.dst.nodes.InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard")

require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg))
// StartGrafana clutters the test.log. Try logging setup to a separate file.
promLog, err := rd.t.L().ChildLogger("prom_setup", logger.QuietStderr, logger.QuietStdout)
if err != nil {
promLog = rd.t.L()
}
require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg))
rd.t.L().Printf("Prom has started")
}
}
Expand Down Expand Up @@ -527,13 +538,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time {
return retainedTime
}

func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) {
func (rd *replicationDriver) stopReplicationStream(
ctx context.Context, ingestionJob int, cutoverTime time.Time,
) {
rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime)
err := retry.ForDuration(time.Minute*5, func() error {
var status string
var payloadBytes []byte
rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`,
ingestionJob).Scan(&status, &payloadBytes)
res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob)
if res.Err() != nil {
// This query can fail if a node shuts down during the query execution;
// therefore, tolerate errors.
return res.Err()
}
require.NoError(rd.t, res.Scan(&status, &payloadBytes))
if jobs.Status(status) == jobs.StatusFailed {
payload := &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, payload); err == nil {
Expand Down Expand Up @@ -593,6 +611,13 @@ func (rd *replicationDriver) main(ctx context.Context) {
metricSnapper := rd.startStatsCollection(ctx)
rd.preStreamingWorkload(ctx)

// Wait for initial workload to be properly replicated across the source cluster to increase
// the probability that the producer returns a topology with more than one node in it,
// else the node shutdown tests can flake.
if rd.rs.srcNodes >= 3 {
require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db))
}

rd.t.L().Printf("begin workload on src cluster")
m := rd.newMonitor(ctx)
// The roachtest driver can use the workloadCtx to cancel the workload.
Expand All @@ -617,10 +642,12 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.t.Status("starting replication stream")
rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now())
ingestionJobID := rd.startReplicationStream(ctx)

removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name)

lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false)
// latency verifier queries may error during a node shutdown event; therefore
// tolerate errors if we anticipate node deaths.
lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(),
getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0)
defer lv.maybeLogLatencyHist()

m.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -655,7 +682,7 @@ func (rd *replicationDriver) main(ctx context.Context) {

rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s",
cutoverTime.String()))
rd.stopReplicationStream(ingestionJobID, cutoverTime)
rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime)
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.metrics.export(rd.t, len(rd.setup.src.nodes))
Expand Down Expand Up @@ -704,7 +731,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster
dstNodes: 1,
// The timeout field ensures the c2c roachtest driver behaves properly.
timeout: 10 * time.Minute,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1},
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
}
Expand Down Expand Up @@ -751,19 +778,20 @@ func registerClusterToCluster(r registry.Registry) {
dstNodes: 3,
cpus: 8,
pdSize: 100,
workload: replicateKV{readPercent: 0},
workload: replicateKV{readPercent: 0, maxBlockBytes: 1024},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
tags: registry.Tags("aws"),
},
{
name: "c2c/UnitTest",
srcNodes: 1,
dstNodes: 1,
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
name: "c2c/UnitTest",
srcNodes: 1,
dstNodes: 1,
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute,
maxBlockBytes: 1024},
timeout: 5 * time.Minute,
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
Expand Down Expand Up @@ -868,7 +896,7 @@ func makeReplResilienceDriver(
rd := makeReplicationDriver(t, c, rsp.replicationSpec)
return replResilienceDriver{
replicationDriver: rd,
phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)),
phase: c2cPhase(rd.rng.Intn(int(phaseCutover) + 1)),
rsp: rsp,
}
}
Expand Down Expand Up @@ -923,7 +951,7 @@ func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) {

findAnotherNode := func(notThisNode int) int {
for {
anotherNode := nodes.RandNode()[0]
anotherNode := nodes.SeededRandNode(rrd.rng)[0]
if notThisNode != anotherNode {
return anotherNode
}
Expand Down Expand Up @@ -960,19 +988,24 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error {
case currentPhase < rrd.phase:
time.Sleep(5 * time.Second)
case currentPhase == rrd.phase:
// Every C2C phase should last at least 30 seconds, so introduce a little
// bit of random waiting before node shutdown to ensure the shutdown occurs
// once we're settled into the target phase.
randomSleep := time.Duration(rand.Intn(6))
rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep)
time.Sleep(randomSleep * time.Second)
rrd.t.L().Printf("In target phase %s", currentPhase.String())
return nil
default:
return errors.New("c2c job past target phase")
}
}
}

func (rrd *replResilienceDriver) sleepBeforeResiliencyEvent() {
// Assuming every C2C phase lasts at least 10 seconds, introduce some waiting
// before a resiliency event (e.g. a node shutdown) to ensure the event occurs
// once we're fully settled into the target phase (e.g. the stream ingestion
// processors have observed the cutover signal).
randomSleep := time.Duration(5+rrd.rng.Intn(6)) * time.Second
rrd.t.L().Printf("Take a %s power nap", randomSleep)
time.Sleep(randomSleep)
}

func registerClusterReplicationResilience(r registry.Registry) {
for _, rsp := range []replResilienceSpec{
{
Expand All @@ -999,7 +1032,7 @@ func registerClusterReplicationResilience(r registry.Registry) {
srcNodes: 4,
dstNodes: 4,
cpus: 8,
workload: replicateKV{readPercent: 0, initDurationMinutes: 2},
workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024},
timeout: 20 * time.Minute,
additionalDuration: 6 * time.Minute,
cutover: 3 * time.Minute,
Expand Down Expand Up @@ -1054,14 +1087,19 @@ func registerClusterReplicationResilience(r registry.Registry) {
// Don't begin shutdown process until c2c job is set up.
<-shutdownSetupDone

// Eagerly listen to cutover signal to exercise node shutdown during actual cutover.
rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`)

// While executing a node shutdown on either the src or destination
// cluster, ensure the destination cluster's stream ingestion job
// completes. If the stream producer job fails, no big deal-- in a real
// DR scenario the src cluster may have gone belly up during a
// successful c2c replication execution.
shutdownStarter := func() jobStarter {
return func(c cluster.Cluster, t test.Test) (string, error) {
return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase()
require.NoError(t, rrd.waitForTargetPhase())
rrd.sleepBeforeResiliencyEvent()
return fmt.Sprintf("%d", rrd.dstJobID), nil
}
}
destinationWatcherNode := rrd.watcherNode
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func createInMemoryTenant(
sysSQL.Exec(t, "CREATE TENANT $1", tenantName)
sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName)

removeTenantRateLimiters(t, sysSQL, tenantName)

Expand Down
21 changes: 16 additions & 5 deletions pkg/workload/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type kv struct {
zipfian bool
sfuDelay time.Duration
splits int
scatter bool
secondaryIndex bool
shards int
targetCompressionRatio float64
Expand Down Expand Up @@ -127,6 +128,7 @@ var kvMeta = workload.Meta{
`span-limit`: {RuntimeOnly: true},
`del-percent`: {RuntimeOnly: true},
`splits`: {RuntimeOnly: true},
`scatter`: {RuntimeOnly: true},
`timeout`: {RuntimeOnly: true},
}
g.flags.IntVar(&g.batchSize, `batch`, 1,
Expand Down Expand Up @@ -157,6 +159,8 @@ var kvMeta = workload.Meta{
`previous --sequential run and R implies a previous random run.`)
g.flags.IntVar(&g.splits, `splits`, 0,
`Number of splits to perform before starting normal operations.`)
g.flags.BoolVar(&g.scatter, `scatter`, false,
`Scatter ranges before starting normal operations.`)
g.flags.BoolVar(&g.secondaryIndex, `secondary-index`, false,
`Add a secondary index to the schema.`)
g.flags.IntVar(&g.shards, `num-shards`, 0,
Expand Down Expand Up @@ -189,13 +193,20 @@ func (w *kv) Flags() workload.Flags { return w.flags }
func (w *kv) Hooks() workload.Hooks {
return workload.Hooks{
PostLoad: func(_ context.Context, db *gosql.DB) error {
if !w.enum {
return nil
}
_, err := db.Exec(`
if w.enum {
_, err := db.Exec(`
CREATE TYPE enum_type AS ENUM ('v');
ALTER TABLE kv ADD COLUMN e enum_type NOT NULL AS ('v') STORED;`)
return err
if err != nil {
return err
}
}
if w.scatter {
if _, err := db.Exec(`ALTER TABLE kv SCATTER`); err != nil {
return err
}
}
return nil
},
Validate: w.validateConfig,
}
Expand Down