diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index b9026bd0fc9f..ccca0966ee11 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -191,6 +191,7 @@ go_test( "alter_changefeed_test.go", "avro_test.go", "changefeed_dist_test.go", + "changefeed_processors_test.go", "changefeed_test.go", "csv_test.go", "encoder_json_test.go", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 5606a813416a..72f2668ebe66 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -525,11 +525,19 @@ func makeKVFeedMonitoringCfg( func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) { var initialHighWater hlc.Timestamp spans = make([]roachpb.Span, 0, len(ca.spec.Watches)) - for _, watch := range ca.spec.Watches { - if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) { + for i, watch := range ca.spec.Watches { + spans = append(spans, watch.Span) + if i == 0 { + initialHighWater = watch.InitialResolved + continue + } + if watch.InitialResolved.Less(initialHighWater) { + // Keep initialHighWater as the minimum of all InitialResolved timestamps. + // If there are any zero InitialResolved timestamps, initial scan is + // ongoing. If there are no zero InitialResolved timestamps, initial scan + // is not required. initialHighWater = watch.InitialResolved } - spans = append(spans, watch.Span) } ca.frontier, err = makeSchemaChangeFrontier(initialHighWater, spans...) diff --git a/pkg/ccl/changefeedccl/changefeed_processors_test.go b/pkg/ccl/changefeedccl/changefeed_processors_test.go new file mode 100644 index 000000000000..c2531dff7bf5 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeed_processors_test.go @@ -0,0 +1,110 @@ +// Copyright 2024 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/stretchr/testify/require" + "testing" +) + +// TestSetupSpansAndFrontier tests that the setupSpansAndFrontier function +// correctly sets up frontier for the changefeed aggregator frontier. +func TestSetupSpansAndFrontier(t *testing.T) { + for _, tc := range []struct { + name string + expectedFrontier hlc.Timestamp + watches []execinfrapb.ChangeAggregatorSpec_Watch + }{ + { + name: "new initial scan", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{}, + }, + }, + }, + { + name: "incomplete initial scan", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 5}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + }, + }, + { + name: "incomplete initial scan", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{}, + }, + }, + }, + { + name: "complete initial scan", + expectedFrontier: hlc.Timestamp{WallTime: 5}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 5}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mockChangefeedAggregator := &changeAggregator{} + mockChangefeedAggregator.spec = execinfrapb.ChangeAggregatorSpec{ + Watches: tc.watches, + } + _, err := mockChangefeedAggregator.setupSpansAndFrontier() + require.NoError(t, err) + require.Equal(t, tc.expectedFrontier, mockChangefeedAggregator.frontier.Frontier()) + }) + } +}