diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index a55df56c57b8..a87b763151d8 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -190,6 +190,7 @@ go_test( "alter_changefeed_test.go", "avro_test.go", "changefeed_dist_test.go", + "changefeed_processors_test.go", "changefeed_test.go", "csv_test.go", "encoder_test.go", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 9b8cf0a9ec60..b61cabc0da14 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -525,11 +525,20 @@ func makeKVFeedMonitoringCfg( func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) { var initialHighWater hlc.Timestamp spans = make([]roachpb.Span, 0, len(ca.spec.Watches)) - for _, watch := range ca.spec.Watches { - if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) { + + // Keep initialHighWater as the minimum of all InitialResolved timestamps. + // If there are any zero InitialResolved timestamps, initial scan is + // ongoing. If there are no zero InitialResolved timestamps, initial scan + // is not required. + for i, watch := range ca.spec.Watches { + spans = append(spans, watch.Span) + if i == 0 { + initialHighWater = watch.InitialResolved + continue + } + if watch.InitialResolved.Less(initialHighWater) { initialHighWater = watch.InitialResolved } - spans = append(spans, watch.Span) } ca.frontier, err = makeSchemaChangeFrontier(initialHighWater, spans...) diff --git a/pkg/ccl/changefeedccl/changefeed_processors_test.go b/pkg/ccl/changefeedccl/changefeed_processors_test.go new file mode 100644 index 000000000000..804717917ea3 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeed_processors_test.go @@ -0,0 +1,135 @@ +// Copyright 2024 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestSetupSpansAndFrontier tests that the setupSpansAndFrontier function +// correctly sets up frontier for the changefeed aggregator frontier. +func TestSetupSpansAndFrontier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, tc := range []struct { + name string + expectedFrontier hlc.Timestamp + watches []execinfrapb.ChangeAggregatorSpec_Watch + }{ + { + name: "new initial scan", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{}, + }, + }, + }, + { + name: "incomplete initial scan with non-empty initial resolved in the middle", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 5}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + }, + }, + { + name: "incomplete initial scan with non-empty initial resolved in the front", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + }, + }, + { + name: "incomplete initial scan with empty initial resolved in the end", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{}, + }, + }, + }, + { + name: "complete initial scan", + expectedFrontier: hlc.Timestamp{WallTime: 5}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 5}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ca := &changeAggregator{ + spec: execinfrapb.ChangeAggregatorSpec{ + Watches: tc.watches, + }, + } + _, err := ca.setupSpansAndFrontier() + require.NoError(t, err) + require.Equal(t, tc.expectedFrontier, ca.frontier.Frontier()) + }) + } +}