Skip to content

Commit

Permalink
changefeedccl: fix initial scan checkpointing
Browse files Browse the repository at this point in the history
Initially, all span initial resolved timestamps are kept as zero upon resuming a
job since initial resolved timestamps are set as initial high water which
remains zero until initial scan is completed. However, since
0eda540,
we began reloading checkpoint timestamps instead of setting them all to zero at
the start. In PR #102717, we introduced a mechanism to reduce message duplicates
by re-loading job progress upon resuming which largely increased the likelihood
of this bug. These errors could lead to incorrect frontier and missing events
during initial scans. This patches changes how we initialize initial high water
and frontier by initializing it as zero if there are any zero initial high water
in initial resolved timestamps.

Fixes: #123371

Release note (enterprise change): Fixed a bug in v22.2+ where long running
initial scans may incorrectly restore checkpoint job progress and drop events
during node / changefeed restart. This issue was most likely to occur in
clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2)
Multiple table targets in a changefeed, or 3) Low
changefeed.frontier_checkpoint_frequency or low
changefeed.frontier_highwater_lag_checkpoint_threshold.
  • Loading branch information
wenyihu6 committed May 4, 2024
1 parent 6205244 commit 6ae9f81
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ go_test(
"alter_changefeed_test.go",
"avro_test.go",
"changefeed_dist_test.go",
"changefeed_processors_test.go",
"changefeed_test.go",
"csv_test.go",
"encoder_test.go",
Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,20 @@ func makeKVFeedMonitoringCfg(
func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) {
var initialHighWater hlc.Timestamp
spans = make([]roachpb.Span, 0, len(ca.spec.Watches))
for _, watch := range ca.spec.Watches {
if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) {

// Keep initialHighWater as the minimum of all InitialResolved timestamps.
// If there are any zero InitialResolved timestamps, initial scan is
// ongoing. If there are no zero InitialResolved timestamps, initial scan
// is not required.
for i, watch := range ca.spec.Watches {
spans = append(spans, watch.Span)
if i == 0 {
initialHighWater = watch.InitialResolved
continue
}
if watch.InitialResolved.Less(initialHighWater) {
initialHighWater = watch.InitialResolved
}
spans = append(spans, watch.Span)
}

ca.frontier, err = makeSchemaChangeFrontier(initialHighWater, spans...)
Expand Down
135 changes: 135 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestSetupSpansAndFrontier tests that the setupSpansAndFrontier function
// correctly sets up frontier for the changefeed aggregator frontier.
func TestSetupSpansAndFrontier(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, tc := range []struct {
name string
expectedFrontier hlc.Timestamp
watches []execinfrapb.ChangeAggregatorSpec_Watch
}{
{
name: "new initial scan",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{},
},
},
},
{
name: "incomplete initial scan with non-empty initial resolved in the middle",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 5},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
},
},
{
name: "incomplete initial scan with non-empty initial resolved in the front",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
},
},
{
name: "incomplete initial scan with empty initial resolved in the end",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{},
},
},
},
{
name: "complete initial scan",
expectedFrontier: hlc.Timestamp{WallTime: 5},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 5},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
ca := &changeAggregator{
spec: execinfrapb.ChangeAggregatorSpec{
Watches: tc.watches,
},
}
_, err := ca.setupSpansAndFrontier()
require.NoError(t, err)
require.Equal(t, tc.expectedFrontier, ca.frontier.Frontier())
})
}
}

0 comments on commit 6ae9f81

Please sign in to comment.