diff --git a/pkg/ccl/changefeedccl/cdcutils/throttle.go b/pkg/ccl/changefeedccl/cdcutils/throttle.go index 4a5a012d09e8..03f6185f2ef1 100644 --- a/pkg/ccl/changefeedccl/cdcutils/throttle.go +++ b/pkg/ccl/changefeedccl/cdcutils/throttle.go @@ -12,7 +12,6 @@ import ( "context" "encoding/json" "fmt" - "math" "sync" "time" @@ -69,7 +68,7 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error { func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) { setLimits := func(rl *quotapool.RateLimiter, rate, burst float64) { // set rateBudget to unlimited if rate is 0. - rateBudget := quotapool.Limit(math.MaxInt64) + rateBudget := quotapool.Inf() if rate > 0 { rateBudget = quotapool.Limit(rate) } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index c2e76e74961f..fe0550816c92 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -85,6 +85,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randident" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -7142,47 +7143,65 @@ func TestChangefeedEndTimeWithCursor(t *testing.T) { defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - knobs := s.TestingKnobs. - DistSQL.(*execinfra.TestingKnobs). - Changefeed.(*TestingKnobs) - endTimeReached := make(chan struct{}) - knobs.FeedKnobs.EndTimeReached = func() bool { - select { - case <-endTimeReached: - return true - default: - return false - } - } - sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") - sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)") var tsCursor string sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp())").Scan(&tsCursor) - sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)") - fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime() - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", tsCursor, fakeEndTime) - defer closeFeed(t, feed) + // Insert 1k rows -- using separate statements to get different MVCC timestamps. + for i := 0; i < 1024; i++ { + sqlDB.Exec(t, "INSERT INTO foo VALUES ($1)", i) + } - assertPayloads(t, feed, []string{ - `foo: [4]->{"after": {"a": 4}}`, - `foo: [5]->{"after": {"a": 5}}`, - `foo: [6]->{"after": {"a": 6}}`, - }) - close(endTimeReached) + // Split table into multiple ranges to make things more interesting. + sqlDB.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (100), (200), (400), (800)") + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + fooSpan := func() roachpb.Span { + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + s.Server.DB(), s.Codec, "d", "foo") + return fooDesc.PrimaryIndexSpan(s.Codec) + }() + // Capture resolved events emitted during changefeed. We expect + // every range to emit resolved event with end_time timestamp. + frontier, err := span.MakeFrontier(fooSpan) + require.NoError(t, err) + knobs.FilterSpanWithMutation = func(rs *jobspb.ResolvedSpan) (bool, error) { + _, err := frontier.Forward(rs.Span, rs.Timestamp) + return false, err + } + + // endTime must be after creation time (5 seconds should be enough + // to reach create changefeed statement and process it). + endTime := s.Server.Clock().Now().AddDuration(5 * time.Second) + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", + tsCursor, eval.TimestampToDecimalDatum(endTime).String()) + defer closeFeed(t, feed) + + // Don't care much about the values emitted (tested elsewhere) -- all + // we want to make sure is that the feed terminates. testFeed := feed.(cdctest.EnterpriseTestFeed) require.NoError(t, testFeed.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusSucceeded })) + + // After changefeed completes, verify we have seen all ranges emit resolved + // event with end_time timestamp. That is: verify frontier.Frontier() is at end_time. + expectedFrontier := endTime.Prev() + testutils.SucceedsWithin(t, func() error { + if expectedFrontier.EqOrdering(frontier.Frontier()) { + return nil + } + return errors.Newf("still waiting for frontier to reach %s, current %s", + expectedFrontier, frontier.Frontier()) + }, 5*time.Second) } - // TODO: Fix sinkless feeds not providing pre-close events if Next is called - // after the feed was closed cdcTest(t, testFn, feedTestEnterpriseSinks) } diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 12e69fd00e45..6ddb3f73044f 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//pkg/sql/rowenc/keyside", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/testutils", "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/encoding", diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index 706a677c9909..d17017d64806 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -105,6 +105,7 @@ func newMemBuffer( b.qp = allocPool{ AbstractPool: quotapool.New("changefeed", quota, opts...), + sv: sv, metrics: metrics, } @@ -218,6 +219,7 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) { } b.metrics.BufferEntriesIn.Inc(1) + b.metrics.BufferEntriesByType[e.et.Index()].Inc(1) b.mu.queue.enqueue(e) select { @@ -246,6 +248,7 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo return alloc, err } b.metrics.BufferEntriesMemAcquired.Inc(n) + b.metrics.AllocatedMem.Inc(n) return alloc, nil } @@ -324,6 +327,7 @@ func (b *blockingBuffer) CloseWithReason(ctx context.Context, reason error) erro quota := r.(*memQuota) quota.closed = true quota.acc.Close(ctx) + b.metrics.AllocatedMem.Dec(quota.allocated) return false }) @@ -442,9 +446,14 @@ func (r *memRequest) ShouldWait() bool { type allocPool struct { *quotapool.AbstractPool metrics *Metrics + sv *settings.Values } func (ap allocPool) Release(ctx context.Context, bytes, entries int64) { + if bytes < 0 { + logcrash.ReportOrPanic(ctx, ap.sv, "attempt to release negative bytes (%d) into pool", bytes) + } + ap.AbstractPool.Update(func(r quotapool.Resource) (shouldNotify bool) { quota := r.(*memQuota) if quota.closed { @@ -452,6 +461,7 @@ func (ap allocPool) Release(ctx context.Context, bytes, entries int64) { } quota.acc.Shrink(ctx, bytes) quota.allocated -= bytes + ap.metrics.AllocatedMem.Dec(bytes) ap.metrics.BufferEntriesMemReleased.Inc(bytes) ap.metrics.BufferEntriesReleased.Inc(entries) return true diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index 612fd866f59c..e42163fcce66 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -32,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -94,9 +97,6 @@ func TestBlockingBuffer(t *testing.T) { } st := cluster.MakeTestingClusterSettings() buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait) - defer func() { - require.NoError(t, buf.CloseWithReason(context.Background(), nil)) - }() producerCtx, stopProducers := context.WithCancel(context.Background()) wg := ctxgroup.WithContext(producerCtx) @@ -105,26 +105,79 @@ func TestBlockingBuffer(t *testing.T) { }() // Start adding KVs to the buffer until we block. + var numResolvedEvents, numKVEvents int wg.GoCtx(func(ctx context.Context) error { rnd, _ := randutil.NewTestRand() for { - err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0))) - if err != nil { - return err + if rnd.Int()%20 == 0 { + prefix := keys.SystemSQLCodec.TablePrefix(42) + sp := roachpb.Span{Key: prefix, EndKey: prefix.Next()} + if err := buf.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, hlc.Timestamp{}, jobspb.ResolvedSpan_BACKFILL)); err != nil { + return err + } + numResolvedEvents++ + } else { + if err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0))); err != nil { + return err + } + numKVEvents++ } } }) - <-waitCh + require.NoError(t, timeutil.RunWithTimeout( + context.Background(), "wait", 10*time.Second, func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-waitCh: + return nil + } + })) // Keep consuming events until we get pushback metrics updated. + var numPopped, numFlush int for metrics.BufferPushbackNanos.Count() == 0 { e, err := buf.Get(context.Background()) require.NoError(t, err) a := e.DetachAlloc() a.Release(context.Background()) + numPopped++ + if e.Type() == kvevent.TypeFlush { + numFlush++ + } } + + // Allocated memory gauge should be non-zero once we buffer some events. + testutils.SucceedsWithin(t, func() error { + if metrics.AllocatedMem.Value() > 0 { + return nil + } + return errors.New("waiting for allocated mem > 0") + }, 5*time.Second) + stopProducers() + require.ErrorIs(t, wg.Wait(), context.Canceled) + + require.EqualValues(t, numKVEvents+numResolvedEvents, metrics.BufferEntriesIn.Count()) + require.EqualValues(t, numPopped, metrics.BufferEntriesOut.Count()) + require.Greater(t, metrics.BufferEntriesMemReleased.Count(), int64(0)) + + // Flush events are special in that they are ephemeral event that doesn't get + // counted when releasing (it's 0 entries and 0 byte event). + require.EqualValues(t, numPopped-numFlush, metrics.BufferEntriesReleased.Count()) + + require.EqualValues(t, numKVEvents, metrics.BufferEntriesByType[kvevent.TypeKV].Count()) + require.EqualValues(t, numResolvedEvents, metrics.BufferEntriesByType[kvevent.TypeResolved].Count()) + + // We might have seen numFlush events, but they are synthetic, and only explicitly enqueued + // flush events are counted. + require.EqualValues(t, 0, metrics.BufferEntriesByType[kvevent.TypeFlush].Count()) + + // After buffer closed, resources are released, and metrics adjusted to reflect. + require.NoError(t, buf.CloseWithReason(context.Background(), context.Canceled)) + + require.EqualValues(t, 0, metrics.AllocatedMem.Value()) } func TestBlockingBufferNotifiesConsumerWhenOutOfMemory(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/kvevent/event.go b/pkg/ccl/changefeedccl/kvevent/event.go index 1815e412636a..0e2c3d731940 100644 --- a/pkg/ccl/changefeedccl/kvevent/event.go +++ b/pkg/ccl/changefeedccl/kvevent/event.go @@ -77,13 +77,10 @@ type MemAllocator interface { type Type uint8 const ( - // TypeUnknown indicates the event could not be parsed. Will fail the feed. - TypeUnknown Type = iota - // TypeFlush indicates a request to flush buffered data. // This request type is emitted by blocking buffer when it's blocked, waiting // for more memory. - TypeFlush + TypeFlush Type = iota // TypeKV indicates that the KV, PrevKeyValue, and BackfillTimestamp methods // on the Event meaningful. @@ -98,6 +95,9 @@ const ( // TypeResolved indicates that the Resolved method on the Event will be // meaningful. TypeResolved = resolvedNone + + // number of event types. + numEventTypes = TypeResolved + 1 ) // Event represents an event emitted by a kvfeed. It is either a KV or a @@ -120,6 +120,27 @@ func (e *Event) Type() Type { } } +// Index returns numerical/ordinal type index suitable for indexing into arrays. +func (t Type) Index() int { + switch t { + case TypeFlush: + return int(TypeFlush) + case TypeKV: + return int(TypeKV) + case TypeResolved, resolvedBackfill, resolvedRestart, resolvedExit: + return int(TypeResolved) + default: + log.Warningf(context.TODO(), + "returning TypeFlush boundary type for unknown event type %d", t) + return int(TypeFlush) + } +} + +// Raw returns the underlying RangeFeedEvent. +func (e *Event) Raw() *kvpb.RangeFeedEvent { + return e.ev +} + // ApproximateSize returns events approximate size in bytes. func (e *Event) ApproximateSize() int { if e.et == TypeFlush { diff --git a/pkg/ccl/changefeedccl/kvevent/metrics.go b/pkg/ccl/changefeedccl/kvevent/metrics.go index 8e07f2a176f5..1212a5b278ef 100644 --- a/pkg/ccl/changefeedccl/kvevent/metrics.go +++ b/pkg/ccl/changefeedccl/kvevent/metrics.go @@ -9,6 +9,7 @@ package kvevent import ( + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -51,6 +52,12 @@ var ( Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + metaChangefeedAllocatedMemory = metric.Metadata{ + Name: "changefeed.buffer_entries.allocated_mem", + Help: "Current quota pool memory allocation", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // Metrics is a metric.Struct for kvfeed metrics. @@ -61,10 +68,30 @@ type Metrics struct { BufferPushbackNanos *metric.Counter BufferEntriesMemAcquired *metric.Counter BufferEntriesMemReleased *metric.Counter + AllocatedMem *metric.Gauge + BufferEntriesByType [numEventTypes]*metric.Counter } // MakeMetrics constructs a Metrics struct with the provided histogram window. func MakeMetrics(histogramWindow time.Duration) Metrics { + eventTypeMeta := func(et Type) metric.Metadata { + eventTypeName := func() string { + switch et { + case TypeFlush: + return "flush" + case TypeKV: + return "kv" + default: + return "resolved" + } + }() + return metric.Metadata{ + Name: fmt.Sprintf("changefeed.buffer_entries.%s", eventTypeName), + Help: fmt.Sprintf("Number of %s elements added to the buffer", eventTypeName), + Measurement: "Events", + Unit: metric.Unit_COUNT, + } + } return Metrics{ BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn), BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut), @@ -72,6 +99,12 @@ func MakeMetrics(histogramWindow time.Duration) Metrics { BufferEntriesMemAcquired: metric.NewCounter(metaChangefeedBufferMemAcquired), BufferEntriesMemReleased: metric.NewCounter(metaChangefeedBufferMemReleased), BufferPushbackNanos: metric.NewCounter(metaChangefeedBufferPushbackNanos), + AllocatedMem: metric.NewGauge(metaChangefeedAllocatedMemory), + BufferEntriesByType: [numEventTypes]*metric.Counter{ + metric.NewCounter(eventTypeMeta(TypeFlush)), + metric.NewCounter(eventTypeMeta(TypeKV)), + metric.NewCounter(eventTypeMeta(TypeResolved)), + }, } } diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 4e8d12ea4f95..77205a2cd8f0 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -691,6 +691,17 @@ func copyFromSourceToDestUntilTableEvent( return nil } + // spanFrontier returns frontier timestamp for the specified span. + spanFrontier = func(sp roachpb.Span) (sf hlc.Timestamp) { + frontier.SpanEntries(sp, func(_ roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { + if sf.IsEmpty() || ts.Less(sf) { + sf = ts + } + return span.ContinueMatch + }) + return sf + } + // applyScanBoundary apply the boundary that we set above. // In most cases, a boundary isn't reached, and thus we do nothing. // If a boundary is reached but event `e` happens before that boundary, @@ -717,10 +728,29 @@ func copyFromSourceToDestUntilTableEvent( if resolved.Timestamp.LessEq(boundaryResolvedTimestamp) { return false, false, nil } + + // At this point, we know event is after boundaryResolvedTimestamp. + skipEvent = true + + if _, ok := scanBoundary.(*errEndTimeReached); ok { + // We know we have end time boundary. In this case, we do not want to + // skip this event because we want to make sure we emit checkpoint at + // exactly boundaryResolvedTimestamp. This checkpoint can be used to + // produce span based changefeed checkpoints if needed. + // We only want to emit this checkpoint once, and then we can skip + // subsequent checkpoints for this span until entire frontier reaches + // boundary timestamp. + if boundaryResolvedTimestamp.Compare(spanFrontier(resolved.Span)) > 0 { + e.Raw().Checkpoint.ResolvedTS = boundaryResolvedTimestamp + skipEvent = false + } + } + if _, err := frontier.Forward(resolved.Span, boundaryResolvedTimestamp); err != nil { - return false, false, err + return true, false, err } - return true, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp), nil + + return skipEvent, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp), nil case kvevent.TypeFlush: // TypeFlush events have a timestamp of zero and should have already // been processed by the timestamp check above. We include this here @@ -778,8 +808,13 @@ func copyFromSourceToDestUntilTableEvent( if scanBoundaryReached { // All component rangefeeds are now at the boundary. // Break out of the ctxgroup by returning the sentinel error. + // (We don't care if skipEntry is false -- scan boundary can only be + // returned for resolved event, and we don't care if we emit this event + // since exiting with scan boundary error will cause appropriate + // boundary type (EXIT) to be emitted for the entire frontier) return scanBoundary } + if skipEntry { return nil } diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index 7be623759b78..1f707f37d24f 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -224,8 +224,8 @@ func registerBackupNodeShutdown(r registry.Registry) { nodeToShutdown := 3 dest := loadBackupData(ctx, t, c) backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` - startBackup := func(c cluster.Cluster, t test.Test) (jobID jobspb.JobID, err error) { - gatewayDB := c.Conn(ctx, t.L(), gatewayNode) + startBackup := func(c cluster.Cluster, l *logger.Logger) (jobID jobspb.JobID, err error) { + gatewayDB := c.Conn(ctx, l, gatewayNode) defer gatewayDB.Close() err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) @@ -246,8 +246,8 @@ func registerBackupNodeShutdown(r registry.Registry) { nodeToShutdown := 2 dest := loadBackupData(ctx, t, c) backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` - startBackup := func(c cluster.Cluster, t test.Test) (jobID jobspb.JobID, err error) { - gatewayDB := c.Conn(ctx, t.L(), gatewayNode) + startBackup := func(c cluster.Cluster, l *logger.Logger) (jobID jobspb.JobID, err error) { + gatewayDB := c.Conn(ctx, l, gatewayNode) defer gatewayDB.Close() err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go index 84f2b6f88f44..8b2f98ed795e 100644 --- a/pkg/cmd/roachtest/tests/cdc_bench.go +++ b/pkg/cmd/roachtest/tests/cdc_bench.go @@ -100,9 +100,6 @@ func registerCDCBench(r registry.Registry) { RequiresLicense: true, Timeout: 2 * time.Hour, // catchup scans with 100k ranges can take >1 hour Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - if ranges == 100000 && scanType == cdcBenchCatchupScan { - t.Skip("fails to complete, see https://github.com/cockroachdb/cockroach/issues/108157") - } runCDCBenchScan(ctx, t, c, scanType, rows, ranges, protocol, format) }, }) @@ -173,6 +170,37 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) { // ranges aren't able to keep up. settings.ClusterSettings["kv.rangefeed.range_stuck_threshold"] = "0" + // Checkpoint frequently. Some of the larger benchmarks might overload the + // cluster. Producing frequent span-level checkpoints helps with recovery. + settings.ClusterSettings["changefeed.frontier_checkpoint_frequency"] = "60s" + settings.ClusterSettings["changefeed.frontier_highwater_lag_checkpoint_threshold"] = "30s" + + // Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default + // configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes). + settings.ClusterSettings["kv.rangefeed.catchup_scan_concurrency"] = "16" + settings.ClusterSettings["kv.rangefeed.concurrent_catchup_iterators"] = "16" + + // Give changefeed more memory and slow down rangefeed checkpoints. + // When running large catchup scan benchmarks (100k ranges), as the benchmark + // nears completion, more and more ranges generate checkpoint events. When + // the rate of checkpoints high (default used to be 200ms), the changefeed + // begins to block on memory acquisition since the fan in factor (~20k + // ranges/node) greatly exceeds processing loop speed (1 goroutine). + // The current pipeline looks like this: + // rangefeed -> + // 1 goroutine physicalKVFeed (acquire Memory) -> + // 1 goroutine copyFromSourceToDestination (filter events) -> + // 1 goroutine changeAggregator.Next -> + // N goroutines rest of the pipeline (encode and emit) + // The memory for the checkpoint events (even ones after end_time) must be allocated + // first; then these events are thrown away (many inefficiencies here -- but + // it's the only thing we can do w/out having to add "end time" support to the rangefeed library). + // The rate of incoming events greatly exceeds the rate with which we consume these events + // (and release allocations), resulting in significant drop in completed ranges throughput. + // Current default is 3s, but if needed increase this time out: + // settings.ClusterSettings["kv.rangefeed.closed_timestamp_refresh_interval"] = "5s" + settings.ClusterSettings["changefeed.memory.per_changefeed_limit"] = "4G" + // Scheduled backups may interfere with performance, disable them. opts.RoachprodOpts.ScheduleBackups = false @@ -181,6 +209,13 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) { // catchup scans. settings.Env = append(settings.Env, "COCKROACH_RANGEFEED_SEND_TIMEOUT=0") + // If this benchmark experiences periodic changefeed restarts due to rpc errors + // (grpc context canceled), consider increase network timeout. + // Under significant load (due to rangefeed), timeout could easily be triggered + // due to elevated goroutine scheduling latency. + // Current default is 4s which should be sufficient. + // settings.Env = append(settings.Env, "COCKROACH_NETWORK_TIMEOUT=6s") + return opts, settings } @@ -286,6 +321,11 @@ func runCDCBenchScan( default: t.Fatalf("unknown scan type %q", scanType) } + + // Lock schema so that changefeed schema feed runs under fast path. + _, err := conn.ExecContext(ctx, "ALTER TABLE kv.kv SET (schema_locked = true);") + require.NoError(t, err) + var jobID int require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf(`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH %s`, sink, with)). @@ -440,6 +480,11 @@ func runCDCBenchWorkload( var done atomic.Value // time.Time if cdcEnabled { t.L().Printf("starting changefeed") + + // Lock schema so that changefeed schema feed runs under fast path. + _, err := conn.ExecContext(ctx, "ALTER TABLE kv.kv SET (schema_locked = true);") + require.NoError(t, err) + require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf( `CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'`, sink, format)). diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 72c88a8b3fb0..66c056a96bdf 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -1322,12 +1322,14 @@ func registerClusterReplicationResilience(r registry.Registry) { t.L().Printf(`%s configured: Shutdown Node %d; Watcher node %d; Gateway nodes %s`, rrd.rsp.name(), rrd.shutdownNode, rrd.watcherNode, rrd.setup.gatewayNodes) } - m := rrd.newMonitor(ctx) - m.Go(func(ctx context.Context) error { + mainDriverCtx, cancelMain := context.WithCancel(ctx) + mainMonitor := rrd.newMonitor(mainDriverCtx) + mainMonitor.Go(func(ctx context.Context) error { rrd.main(ctx) return nil }) - defer m.Wait() + defer cancelMain() + defer mainMonitor.Wait() // Don't begin shutdown process until c2c job is set up. <-shutdownSetupDone @@ -1341,8 +1343,10 @@ func registerClusterReplicationResilience(r registry.Registry) { // DR scenario the src cluster may have gone belly up during a // successful c2c replication execution. shutdownStarter := func() jobStarter { - return func(c cluster.Cluster, t test.Test) (jobspb.JobID, error) { - require.NoError(t, waitForTargetPhase(ctx, rrd.replicationDriver, rrd.dstJobID, rrd.phase)) + return func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error) { + if err := waitForTargetPhase(ctx, rrd.replicationDriver, rrd.dstJobID, rrd.phase); err != nil { + return jobspb.JobID(0), err + } sleepBeforeResiliencyEvent(rrd.replicationDriver, rrd.phase) return rrd.dstJobID, nil } @@ -1356,8 +1360,12 @@ func registerClusterReplicationResilience(r registry.Registry) { watcherNode: destinationWatcherNode, crdbNodes: rrd.crdbNodes(), restartSettings: []install.ClusterSettingOption{install.SecureOption(true)}, + rng: rrd.rng, + } + if err := executeNodeShutdown(ctx, t, c, shutdownCfg, shutdownStarter()); err != nil { + cancelMain() + t.Fatalf("shutdown execution failed: %s", err) } - executeNodeShutdown(ctx, t, c, shutdownCfg, shutdownStarter()) }, ) } diff --git a/pkg/cmd/roachtest/tests/import.go b/pkg/cmd/roachtest/tests/import.go index bf7bb5082b43..f0f3d9c16ad3 100644 --- a/pkg/cmd/roachtest/tests/import.go +++ b/pkg/cmd/roachtest/tests/import.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/testutils/release" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -44,7 +45,7 @@ func readCreateTableFromFixture(fixtureURI string, gatewayDB *gosql.DB) (string, func registerImportNodeShutdown(r registry.Registry) { getImportRunner := func(ctx context.Context, t test.Test, gatewayNode int) jobStarter { - startImport := func(c cluster.Cluster, t test.Test) (jobspb.JobID, error) { + startImport := func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error) { var jobID jobspb.JobID // partsupp is 11.2 GiB. tableName := "partsupp" diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index 51c9cf9b5e91..b9daa751c4c1 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -14,6 +14,7 @@ import ( "context" gosql "database/sql" "fmt" + "math/rand" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -30,7 +32,7 @@ import ( "github.com/stretchr/testify/require" ) -type jobStarter func(c cluster.Cluster, t test.Test) (jobspb.JobID, error) +type jobStarter func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error) // jobSurvivesNodeShutdown is a helper that tests that a given job, // running on the specified gatewayNode will still complete successfully @@ -55,7 +57,7 @@ func jobSurvivesNodeShutdown( waitFor3XReplication: true, sleepBeforeShutdown: 30 * time.Second, } - executeNodeShutdown(ctx, t, c, cfg, startJob) + require.NoError(t, executeNodeShutdown(ctx, t, c, cfg, startJob)) } type nodeShutdownConfig struct { @@ -65,11 +67,17 @@ type nodeShutdownConfig struct { restartSettings []install.ClusterSettingOption waitFor3XReplication bool sleepBeforeShutdown time.Duration + rng *rand.Rand } +// executeNodeShutdown executes a node shutdown and returns all errors back to the caller. +// +// TODO(msbutler): ideally, t.L() is only passed to this function instead of t, +// but WaitFor3xReplication requires t. Once this function only has a logger, we +// can guarantee that all errors return to the caller. func executeNodeShutdown( ctx context.Context, t test.Test, c cluster.Cluster, cfg nodeShutdownConfig, startJob jobStarter, -) { +) error { target := c.Node(cfg.shutdownNode) t.L().Printf("test has chosen shutdown target node %d, and watcher node %d", cfg.shutdownNode, cfg.watcherNode) @@ -83,14 +91,20 @@ func executeNodeShutdown( // nodes down. t.Status("waiting for cluster to be 3x replicated") err := WaitFor3XReplication(ctx, t, watcherDB) - require.NoError(t, err) + if err != nil { + return err + } } t.Status("running job") - jobID, err := startJob(c, t) - require.NoError(t, err) + jobID, err := startJob(c, t.L()) + if err != nil { + return err + } t.L().Printf("started running job with ID %s", jobID) - WaitForRunning(t, watcherDB, jobID, time.Minute) + if err := WaitForRunning(ctx, watcherDB, jobID, time.Minute); err != nil { + return err + } m := c.NewMonitor(ctx, cfg.crdbNodes) m.ExpectDeath() @@ -122,18 +136,27 @@ func executeNodeShutdown( } }) time.Sleep(cfg.sleepBeforeShutdown) - rng, _ := randutil.NewTestRand() - shouldUseSigKill := rng.Float64() > 0.5 + if cfg.rng == nil { + rng, _ := randutil.NewTestRand() + cfg.rng = rng + } + shouldUseSigKill := cfg.rng.Float64() > 0.5 if shouldUseSigKill { t.L().Printf(`stopping node (using SIGKILL) %s`, target) - require.NoError(t, c.StopE(ctx, t.L(), option.DefaultStopOpts(), target), "could not stop node %s", target) + if err := c.StopE(ctx, t.L(), option.DefaultStopOpts(), target); err != nil { + return errors.Wrapf(err, "could not stop node %s", target) + } } else { t.L().Printf(`stopping node gracefully %s`, target) - require.NoError(t, c.StopCockroachGracefullyOnNode(ctx, t.L(), cfg.shutdownNode), "could not stop node %s", target) + if err := c.StopCockroachGracefullyOnNode(ctx, t.L(), cfg.shutdownNode); err != nil { + return errors.Wrapf(err, "could not stop node %s", target) + } } t.L().Printf("stopped node %s", target) - m.Wait() + if err := m.WaitE(); err != nil { + return err + } // NB: the roachtest harness checks that at the end of the test, all nodes // that have data also have a running process. t.Status(fmt.Sprintf("restarting %s (node restart test is done)\n", target)) @@ -141,15 +164,19 @@ func executeNodeShutdown( // set or disallowed the automatic backup schedule. if err := c.StartE(ctx, t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings(cfg.restartSettings...), target); err != nil { - t.Fatal(errors.Wrapf(err, "could not restart node %s", target)) + return errors.Wrapf(err, "could not restart node %s", target) } + return nil } -func WaitForRunning(t test.Test, db *gosql.DB, jobID jobspb.JobID, maxWait time.Duration) { - sqlDB := sqlutils.MakeSQLRunner(db) - testutils.SucceedsWithin(t, func() error { +func WaitForRunning( + ctx context.Context, db *gosql.DB, jobID jobspb.JobID, maxWait time.Duration, +) error { + return testutils.SucceedsWithinError(func() error { var status jobs.Status - sqlDB.QueryRow(t, "SELECT status FROM crdb_internal.system_jobs WHERE id = $1", jobID).Scan(&status) + if err := db.QueryRowContext(ctx, "SELECT status FROM crdb_internal.system_jobs WHERE id = $1", jobID).Scan(&status); err != nil { + return err + } switch status { case jobs.StatusPending: case jobs.StatusRunning: diff --git a/pkg/cmd/roachtest/tests/restore.go b/pkg/cmd/roachtest/tests/restore.go index 49606576991a..5572bcee2994 100644 --- a/pkg/cmd/roachtest/tests/restore.go +++ b/pkg/cmd/roachtest/tests/restore.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -57,7 +58,7 @@ func registerRestoreNodeShutdown(r registry.Registry) { makeRestoreStarter := func(ctx context.Context, t test.Test, c cluster.Cluster, gatewayNode int, rd restoreDriver) jobStarter { - return func(c cluster.Cluster, t test.Test) (jobspb.JobID, error) { + return func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error) { return rd.runDetached(ctx, "DATABASE tpce", gatewayNode) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index bb86159eecd6..a144694e59ca 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -375,9 +375,7 @@ func (r *rangeFeedRegistry) ForEachPartialRangefeed(fn ActiveRangeFeedIterFn) (i partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { active.Lock() defer active.Unlock() - p := active.PartialRangeFeed - p.InCatchup = active.catchupRes != nil - return p + return active.PartialRangeFeed } r.ranges.Range(func(k, v interface{}) bool { active := k.(*activeRangeFeed) @@ -520,6 +518,7 @@ func newActiveRangeFeed( Span: span, StartAfter: startAfter, CreatedTime: timeutil.Now(), + InCatchup: true, }, } @@ -541,6 +540,9 @@ func (a *activeRangeFeed) releaseCatchupScan() { if a.catchupRes != nil { a.catchupRes.Release() a.catchupRes = nil + a.Lock() + a.InCatchup = false + a.Unlock() } } diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 30c22616bc7e..b97485832c4b 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -44,6 +44,9 @@ type ScheduledProcessor struct { reg registry rts resolvedTimestamp + // processCtx is the annotated background context used for process(). It is + // stored here to avoid reconstructing it on every call. + processCtx context.Context requestQueue chan request eventC chan *event // If true, processor is not processing data anymore and waiting for registrations @@ -65,10 +68,11 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor { cfg.SetDefaults() cfg.AmbientContext.AddLogTag("rangefeed", nil) p := &ScheduledProcessor{ - Config: cfg, - scheduler: NewClientScheduler(cfg.Scheduler), - reg: makeRegistry(cfg.Metrics), - rts: makeResolvedTimestamp(), + Config: cfg, + scheduler: NewClientScheduler(cfg.Scheduler), + reg: makeRegistry(cfg.Metrics), + rts: makeResolvedTimestamp(), + processCtx: cfg.AmbientContext.AnnotateCtx(context.Background()), requestQueue: make(chan request, 20), eventC: make(chan *event, cfg.EventChanCap), @@ -121,7 +125,7 @@ func (p *ScheduledProcessor) Start( // process is a scheduler callback that is processing scheduled events and // requests. func (p *ScheduledProcessor) process(e processorEventType) processorEventType { - ctx := p.Config.AmbientContext.AnnotateCtx(context.Background()) + ctx := p.processCtx if e&RequestQueued != 0 { p.processRequests(ctx) }