Skip to content

Commit

Permalink
changefeedccl: fix initial scan checkpointing
Browse files Browse the repository at this point in the history
Previously, we forward the most up to date state for all spans in cockroachdb#102717 to reduce duplicates upon changefeed resumes. This can cause unexpected behaviour during initial scans since initial scan logic relies on all resolved timestamps being empty.
  • Loading branch information
wenyihu6 committed May 10, 2024
1 parent d472092 commit a4ea646
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ go_test(
"alter_changefeed_test.go",
"avro_test.go",
"changefeed_dist_test.go",
"changefeed_processors_test.go",
"changefeed_test.go",
"csv_test.go",
"encoder_json_test.go",
Expand Down
14 changes: 11 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,19 @@ func makeKVFeedMonitoringCfg(
func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) {
var initialHighWater hlc.Timestamp
spans = make([]roachpb.Span, 0, len(ca.spec.Watches))
for _, watch := range ca.spec.Watches {
if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) {
for i, watch := range ca.spec.Watches {
spans = append(spans, watch.Span)
if i == 0 {
initialHighWater = watch.InitialResolved
continue
}
if watch.InitialResolved.Less(initialHighWater) {
// Keep initialHighWater as the minimum of all InitialResolved timestamps.
// If there are any zero InitialResolved timestamps, initial scan is
// ongoing. If there are no zero InitialResolved timestamps, initial scan
// is not required.
initialHighWater = watch.InitialResolved
}
spans = append(spans, watch.Span)
}

ca.frontier, err = makeSchemaChangeFrontier(initialHighWater, spans...)
Expand Down
110 changes: 110 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/stretchr/testify/require"
"testing"
)

// TestSetupSpansAndFrontier tests that the setupSpansAndFrontier function
// correctly sets up frontier for the changefeed aggregator frontier.
func TestSetupSpansAndFrontier(t *testing.T) {
for _, tc := range []struct {
name string
expectedFrontier hlc.Timestamp
watches []execinfrapb.ChangeAggregatorSpec_Watch
}{
{
name: "new initial scan",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{},
},
},
},
{
name: "incomplete initial scan",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 5},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
},
},
{
name: "incomplete initial scan",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{},
},
},
},
{
name: "complete initial scan",
expectedFrontier: hlc.Timestamp{WallTime: 5},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 5},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
mockChangefeedAggregator := &changeAggregator{}
mockChangefeedAggregator.spec = execinfrapb.ChangeAggregatorSpec{
Watches: tc.watches,
}
_, err := mockChangefeedAggregator.setupSpansAndFrontier()
require.NoError(t, err)
require.Equal(t, tc.expectedFrontier, mockChangefeedAggregator.frontier.Frontier())
})
}
}

0 comments on commit a4ea646

Please sign in to comment.