Skip to content

Commit

Permalink
changefeedccl: Re-enable 100k range catchup/rangefeed benchmark
Browse files Browse the repository at this point in the history
Re-enable regular rangefeed catchup benchmark over 100k ranges.
Adjust cdc bench configuration to ensure the benchmark completes
in reasonable time.

Fixes cockroachdb#108157

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 8, 2023
1 parent ca4683b commit 526100a
Showing 1 changed file with 48 additions and 3 deletions.
51 changes: 48 additions & 3 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func registerCDCBench(r registry.Registry) {
RequiresLicense: true,
Timeout: 2 * time.Hour, // catchup scans with 100k ranges can take >1 hour
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if ranges == 100000 && scanType == cdcBenchCatchupScan {
t.Skip("fails to complete, see https://github.com/cockroachdb/cockroach/issues/108157")
}
runCDCBenchScan(ctx, t, c, scanType, rows, ranges, protocol, format)
},
})
Expand Down Expand Up @@ -173,6 +170,37 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {
// ranges aren't able to keep up.
settings.ClusterSettings["kv.rangefeed.range_stuck_threshold"] = "0"

// Checkpoint frequently. Some of the larger benchmarks might overload the
// cluster. Producing frequent span-level checkpoints helps with recovery.
settings.ClusterSettings["changefeed.frontier_checkpoint_frequency"] = "60s"
settings.ClusterSettings["changefeed.frontier_highwater_lag_checkpoint_threshold"] = "30s"

// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
// configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
settings.ClusterSettings["kv.rangefeed.catchup_scan_concurrency"] = "16"
settings.ClusterSettings["kv.rangefeed.concurrent_catchup_iterators"] = "16"

// Give changefeed more memory and slow down rangefeed checkpoints.
// When running large catchup scan benchmarks (100k ranges), as the benchmark
// nears completion, more and more ranges generate checkpoint events. When
// the rate of checkpoints high (default used to be 200ms), the changefeed
// begins to block on memory acquisition since the fan in factor (~20k
// ranges/node) greatly exceeds processing loop speed (1 goroutine).
// The current pipeline looks like this:
// rangefeed ->
// 1 goroutine physicalKVFeed (acquire Memory) ->
// 1 goroutine copyFromSourceToDestination (filter events) ->
// 1 goroutine changeAggregator.Next ->
// N goroutines rest of the pipeline (encode and emit)
// The memory for the checkpoint events (even ones after end_time) must be allocated
// first; then these events are thrown away (many inefficiencies here -- but
// it's the only thing we can do w/out having to add "end time" support to the rangefeed library).
// The rate of incoming events greatly exceeds the rate with which we consume these events
// (and release allocations), resulting in significant drop in completed ranges throughput.
// Current default is 3s, but if needed increase this time out:
// settings.ClusterSettings["kv.rangefeed.closed_timestamp_refresh_interval"] = "5s"
settings.ClusterSettings["changefeed.memory.per_changefeed_limit"] = "4G"

// Scheduled backups may interfere with performance, disable them.
opts.RoachprodOpts.ScheduleBackups = false

Expand All @@ -181,6 +209,13 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {
// catchup scans.
settings.Env = append(settings.Env, "COCKROACH_RANGEFEED_SEND_TIMEOUT=0")

// If this benchmark experiences periodic changefeed restarts due to rpc errors
// (grpc context canceled), consider increase network timeout.
// Under significant load (due to rangefeed), timeout could easily be triggered
// due to elevated goroutine scheduling latency.
// Current default is 4s which should be sufficient.
// settings.Env = append(settings.Env, "COCKROACH_NETWORK_TIMEOUT=6s")

return opts, settings
}

Expand Down Expand Up @@ -286,6 +321,11 @@ func runCDCBenchScan(
default:
t.Fatalf("unknown scan type %q", scanType)
}

// Lock schema so that changefeed schema feed runs under fast path.
_, err := conn.ExecContext(ctx, "ALTER TABLE kv.kv SET (schema_locked = true);")
require.NoError(t, err)

var jobID int
require.NoError(t, conn.QueryRowContext(ctx,
fmt.Sprintf(`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH %s`, sink, with)).
Expand Down Expand Up @@ -430,6 +470,11 @@ func runCDCBenchWorkload(
var done atomic.Value // time.Time
if cdcEnabled {
t.L().Printf("starting changefeed")

// Lock schema so that changefeed schema feed runs under fast path.
_, err := conn.ExecContext(ctx, "ALTER TABLE kv.kv SET (schema_locked = true);")
require.NoError(t, err)

require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf(
`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'`,
sink, format)).
Expand Down

0 comments on commit 526100a

Please sign in to comment.