From d16c0114baf1cf5dc3481ae8a536bdd1f3742bfd Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 9 May 2023 16:54:02 -0400 Subject: [PATCH 1/3] workload: add --scatter flag to kv workload The user can now run `./workload init kv --scatter ....` which scatters the kv table across the cluster after the initial data load. This flag is best used with `--splits`, `--max-block-bytes`, and `--insert-count`. Epic: none Release note: none --- pkg/workload/kv/kv.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 84007fba2733..3eec400c98c0 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -88,6 +88,7 @@ type kv struct { zipfian bool sfuDelay time.Duration splits int + scatter bool secondaryIndex bool shards int targetCompressionRatio float64 @@ -127,6 +128,7 @@ var kvMeta = workload.Meta{ `span-limit`: {RuntimeOnly: true}, `del-percent`: {RuntimeOnly: true}, `splits`: {RuntimeOnly: true}, + `scatter`: {RuntimeOnly: true}, `timeout`: {RuntimeOnly: true}, } g.flags.IntVar(&g.batchSize, `batch`, 1, @@ -157,6 +159,8 @@ var kvMeta = workload.Meta{ `previous --sequential run and R implies a previous random run.`) g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations.`) + g.flags.BoolVar(&g.scatter, `scatter`, false, + `Scatter ranges before starting normal operations.`) g.flags.BoolVar(&g.secondaryIndex, `secondary-index`, false, `Add a secondary index to the schema.`) g.flags.IntVar(&g.shards, `num-shards`, 0, @@ -189,13 +193,20 @@ func (w *kv) Flags() workload.Flags { return w.flags } func (w *kv) Hooks() workload.Hooks { return workload.Hooks{ PostLoad: func(_ context.Context, db *gosql.DB) error { - if !w.enum { - return nil - } - _, err := db.Exec(` + if w.enum { + _, err := db.Exec(` CREATE TYPE enum_type AS ENUM ('v'); ALTER TABLE kv ADD COLUMN e enum_type NOT NULL AS ('v') STORED;`) - return err + if err != nil { + return err + } + } + if w.scatter { + if _, err := db.Exec(`ALTER TABLE kv SCATTER`); err != nil { + return err + } + } + return nil }, Validate: w.validateConfig, } From 91d80223468f7f62f3144034b523aa25086bb29f Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 8 May 2023 17:15:12 -0400 Subject: [PATCH 2/3] c2c: rename completeStreamIngestion to applyCutoverTime Release note: none --- pkg/ccl/streamingccl/streamingest/alter_replication_job.go | 2 +- pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go | 2 +- pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index dbb3d46cedf0..7879ce03f36d 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -264,7 +264,7 @@ func alterTenantJobCutover( cutoverTime, record.Timestamp) } } - if err := completeStreamIngestion(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { + if err := applyCutoverTime(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { return hlc.Timestamp{}, err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 4755b67c79af..dccdd0d10932 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -37,7 +37,7 @@ type streamIngestManagerImpl struct { func (r *streamIngestManagerImpl) CompleteStreamIngestion( ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp, ) error { - return completeStreamIngestion(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) + return applyCutoverTime(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) } // GetStreamIngestionStats implements streaming.StreamIngestManager interface. diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 22d85985ecf2..11417b83ccc5 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -38,8 +38,9 @@ import ( "github.com/cockroachdb/errors" ) -// completeStreamIngestion terminates the stream as of specified time. -func completeStreamIngestion( +// applyCutoverTime modifies the consumer job record with a cutover time and +// unpauses the job if necessary. +func applyCutoverTime( ctx context.Context, jobRegistry *jobs.Registry, txn isql.Txn, From 093e2ddee34bac6492d7435e6e721c664c0d1785 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 26 Apr 2023 17:57:08 -0400 Subject: [PATCH 3/3] c2c: deflake c2c/shutdown roachtests This patch addresses to roachtest failure modes: - Prevents roachtest failure if a query fails during a node shutdown. - Prevents the src cluster from returning a single node topology, which could cause the stream ingestion job to hang if the participating src node gets shut down. Longer term, automatic replanning will prevent this. Specifically, this patch changes the kv workload to split and scatter the kv table across the cluster before the c2c job begins. Fixes #101898 Fixes #102111 This patch also makes it easier to reproduce c2c roachtest failures by plumbing a random seed to several components of the roachtest driver. Release note: None --- pkg/cmd/roachtest/option/node_list_option.go | 5 + pkg/cmd/roachtest/tests/cluster_to_cluster.go | 138 +++++++++++------- pkg/cmd/roachtest/tests/multitenant_utils.go | 2 + 3 files changed, 95 insertions(+), 50 deletions(-) diff --git a/pkg/cmd/roachtest/option/node_list_option.go b/pkg/cmd/roachtest/option/node_list_option.go index 475d78d2c956..5535a7eea2dd 100644 --- a/pkg/cmd/roachtest/option/node_list_option.go +++ b/pkg/cmd/roachtest/option/node_list_option.go @@ -73,6 +73,11 @@ func (n NodeListOption) RandNode() NodeListOption { return NodeListOption{n[rand.Intn(len(n))]} } +// SeededRandNode returns a random node from the NodeListOption using a seeded rand object. +func (n NodeListOption) SeededRandNode(rand *rand.Rand) NodeListOption { + return NodeListOption{n[rand.Intn(len(n))]} +} + // NodeIDsString returns the nodes in the NodeListOption, separated by spaces. func (n NodeListOption) NodeIDsString() string { result := "" diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index f8eea8294209..e87a73bbdf45 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -34,11 +34,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -228,33 +230,33 @@ type replicateKV struct { readPercent int // This field is merely used to debug the c2c framework for finite workloads. - debugRunDurationMinutes int + debugRunDuration time.Duration - // initDuration, if nonzero, will pre-populate the src cluster - initDurationMinutes int + // the number of rows inserted into the cluster before c2c begins + initRows int + + // max size of raw data written during each insertion + maxBlockBytes int } func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { - if kv.initDurationMinutes == 0 { - return "" - } - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`, - kv.initDurationMinutes, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload init kv`). + MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows). + MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes). + Flag("splits", 100). + Option("scatter"). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { - debugDuration := "" - if kv.debugRunDurationMinutes != 0 { - debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes) - } - // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`, - debugDuration, - kv.readPercent, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload run kv`). + Option("tolerate-errors"). + Flag("max-block-bytes", kv.maxBlockBytes). + Flag("read-percent", kv.readPercent). + MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) runDriver( @@ -338,13 +340,17 @@ type replicationDriver struct { t test.Test c cluster.Cluster metrics *c2cMetrics + rng *rand.Rand } func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver { + rng, seed := randutil.NewTestRand() + t.L().Printf(`Random Seed is %d`, seed) return replicationDriver{ - t: t, - c: c, - rs: rs, + t: t, + c: c, + rs: rs, + rng: rng, } } @@ -367,8 +373,8 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) - srcNode := srcCluster.RandNode() - destNode := dstCluster.RandNode() + srcNode := srcCluster.SeededRandNode(rd.rng) + destNode := dstCluster.SeededRandNode(rd.rng) addr, err := c.ExternalPGUrl(ctx, t.L(), srcNode, "") require.NoError(t, err) @@ -428,7 +434,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste WithNodeExporter(rd.setup.dst.nodes.InstallNodes()). WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg)) + // StartGrafana clutters the test.log. Try logging setup to a separate file. + promLog, err := rd.t.L().ChildLogger("prom_setup", logger.QuietStderr, logger.QuietStdout) + if err != nil { + promLog = rd.t.L() + } + require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg)) rd.t.L().Printf("Prom has started") } } @@ -527,13 +538,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time { return retainedTime } -func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { +func (rd *replicationDriver) stopReplicationStream( + ctx context.Context, ingestionJob int, cutoverTime time.Time, +) { rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime) err := retry.ForDuration(time.Minute*5, func() error { var status string var payloadBytes []byte - rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, - ingestionJob).Scan(&status, &payloadBytes) + res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob) + if res.Err() != nil { + // This query can fail if a node shuts down during the query execution; + // therefore, tolerate errors. + return res.Err() + } + require.NoError(rd.t, res.Scan(&status, &payloadBytes)) if jobs.Status(status) == jobs.StatusFailed { payload := &jobspb.Payload{} if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { @@ -593,6 +611,13 @@ func (rd *replicationDriver) main(ctx context.Context) { metricSnapper := rd.startStatsCollection(ctx) rd.preStreamingWorkload(ctx) + // Wait for initial workload to be properly replicated across the source cluster to increase + // the probability that the producer returns a topology with more than one node in it, + // else the node shutdown tests can flake. + if rd.rs.srcNodes >= 3 { + require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db)) + } + rd.t.L().Printf("begin workload on src cluster") m := rd.newMonitor(ctx) // The roachtest driver can use the workloadCtx to cancel the workload. @@ -617,10 +642,12 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status("starting replication stream") rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) ingestionJobID := rd.startReplicationStream(ctx) - removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name) - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false) + // latency verifier queries may error during a node shutdown event; therefore + // tolerate errors if we anticipate node deaths. + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), + getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0) defer lv.maybeLogLatencyHist() m.Go(func(ctx context.Context) error { @@ -655,7 +682,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - rd.stopReplicationStream(ingestionJobID, cutoverTime) + rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime) rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -704,7 +731,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. timeout: 10 * time.Minute, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1}, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, } @@ -751,19 +778,20 @@ func registerClusterToCluster(r registry.Registry) { dstNodes: 3, cpus: 8, pdSize: 100, - workload: replicateKV{readPercent: 0}, + workload: replicateKV{readPercent: 0, maxBlockBytes: 1024}, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, tags: registry.Tags("aws"), }, { - name: "c2c/UnitTest", - srcNodes: 1, - dstNodes: 1, - cpus: 4, - pdSize: 10, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + name: "c2c/UnitTest", + srcNodes: 1, + dstNodes: 1, + cpus: 4, + pdSize: 10, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, + maxBlockBytes: 1024}, timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, @@ -868,7 +896,7 @@ func makeReplResilienceDriver( rd := makeReplicationDriver(t, c, rsp.replicationSpec) return replResilienceDriver{ replicationDriver: rd, - phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)), + phase: c2cPhase(rd.rng.Intn(int(phaseCutover) + 1)), rsp: rsp, } } @@ -923,7 +951,7 @@ func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) { findAnotherNode := func(notThisNode int) int { for { - anotherNode := nodes.RandNode()[0] + anotherNode := nodes.SeededRandNode(rrd.rng)[0] if notThisNode != anotherNode { return anotherNode } @@ -960,12 +988,7 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { case currentPhase < rrd.phase: time.Sleep(5 * time.Second) case currentPhase == rrd.phase: - // Every C2C phase should last at least 30 seconds, so introduce a little - // bit of random waiting before node shutdown to ensure the shutdown occurs - // once we're settled into the target phase. - randomSleep := time.Duration(rand.Intn(6)) - rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep) - time.Sleep(randomSleep * time.Second) + rrd.t.L().Printf("In target phase %s", currentPhase.String()) return nil default: return errors.New("c2c job past target phase") @@ -973,6 +996,16 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { } } +func (rrd *replResilienceDriver) sleepBeforeResiliencyEvent() { + // Assuming every C2C phase lasts at least 10 seconds, introduce some waiting + // before a resiliency event (e.g. a node shutdown) to ensure the event occurs + // once we're fully settled into the target phase (e.g. the stream ingestion + // processors have observed the cutover signal). + randomSleep := time.Duration(5+rrd.rng.Intn(6)) * time.Second + rrd.t.L().Printf("Take a %s power nap", randomSleep) + time.Sleep(randomSleep) +} + func registerClusterReplicationResilience(r registry.Registry) { for _, rsp := range []replResilienceSpec{ { @@ -999,7 +1032,7 @@ func registerClusterReplicationResilience(r registry.Registry) { srcNodes: 4, dstNodes: 4, cpus: 8, - workload: replicateKV{readPercent: 0, initDurationMinutes: 2}, + workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024}, timeout: 20 * time.Minute, additionalDuration: 6 * time.Minute, cutover: 3 * time.Minute, @@ -1054,6 +1087,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // Don't begin shutdown process until c2c job is set up. <-shutdownSetupDone + // Eagerly listen to cutover signal to exercise node shutdown during actual cutover. + rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`) + // While executing a node shutdown on either the src or destination // cluster, ensure the destination cluster's stream ingestion job // completes. If the stream producer job fails, no big deal-- in a real @@ -1061,7 +1097,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // successful c2c replication execution. shutdownStarter := func() jobStarter { return func(c cluster.Cluster, t test.Test) (string, error) { - return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase() + require.NoError(t, rrd.waitForTargetPhase()) + rrd.sleepBeforeResiliencyEvent() + return fmt.Sprintf("%d", rrd.dstJobID), nil } } destinationWatcherNode := rrd.watcherNode diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 2ca12644bcbe..7c5df2dc16a3 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -338,6 +338,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName) removeTenantRateLimiters(t, sysSQL, tenantName)