From 6ae9f8174e58878454dbdb617d6efad079300347 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Sat, 4 May 2024 13:38:30 +0000 Subject: [PATCH] changefeedccl: fix initial scan checkpointing Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since https://github.com/cockroachdb/cockroach/commit/0eda54018b9676f855efcd90bfdd0c486c97bfdd, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR #102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: #123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold. --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/changefeed_processors.go | 15 +- .../changefeed_processors_test.go | 135 ++++++++++++++++++ 3 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 pkg/ccl/changefeedccl/changefeed_processors_test.go diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index a55df56c57b8..a87b763151d8 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -190,6 +190,7 @@ go_test( "alter_changefeed_test.go", "avro_test.go", "changefeed_dist_test.go", + "changefeed_processors_test.go", "changefeed_test.go", "csv_test.go", "encoder_test.go", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 9b8cf0a9ec60..b61cabc0da14 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -525,11 +525,20 @@ func makeKVFeedMonitoringCfg( func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) { var initialHighWater hlc.Timestamp spans = make([]roachpb.Span, 0, len(ca.spec.Watches)) - for _, watch := range ca.spec.Watches { - if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) { + + // Keep initialHighWater as the minimum of all InitialResolved timestamps. + // If there are any zero InitialResolved timestamps, initial scan is + // ongoing. If there are no zero InitialResolved timestamps, initial scan + // is not required. + for i, watch := range ca.spec.Watches { + spans = append(spans, watch.Span) + if i == 0 { + initialHighWater = watch.InitialResolved + continue + } + if watch.InitialResolved.Less(initialHighWater) { initialHighWater = watch.InitialResolved } - spans = append(spans, watch.Span) } ca.frontier, err = makeSchemaChangeFrontier(initialHighWater, spans...) diff --git a/pkg/ccl/changefeedccl/changefeed_processors_test.go b/pkg/ccl/changefeedccl/changefeed_processors_test.go new file mode 100644 index 000000000000..804717917ea3 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeed_processors_test.go @@ -0,0 +1,135 @@ +// Copyright 2024 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestSetupSpansAndFrontier tests that the setupSpansAndFrontier function +// correctly sets up frontier for the changefeed aggregator frontier. +func TestSetupSpansAndFrontier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, tc := range []struct { + name string + expectedFrontier hlc.Timestamp + watches []execinfrapb.ChangeAggregatorSpec_Watch + }{ + { + name: "new initial scan", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{}, + }, + }, + }, + { + name: "incomplete initial scan with non-empty initial resolved in the middle", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 5}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + }, + }, + { + name: "incomplete initial scan with non-empty initial resolved in the front", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + }, + }, + { + name: "incomplete initial scan with empty initial resolved in the end", + expectedFrontier: hlc.Timestamp{}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{}, + }, + }, + }, + { + name: "complete initial scan", + expectedFrontier: hlc.Timestamp{WallTime: 5}, + watches: []execinfrapb.ChangeAggregatorSpec_Watch{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + InitialResolved: hlc.Timestamp{WallTime: 10}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + InitialResolved: hlc.Timestamp{WallTime: 20}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + InitialResolved: hlc.Timestamp{WallTime: 5}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ca := &changeAggregator{ + spec: execinfrapb.ChangeAggregatorSpec{ + Watches: tc.watches, + }, + } + _, err := ca.setupSpansAndFrontier() + require.NoError(t, err) + require.Equal(t, tc.expectedFrontier, ca.frontier.Frontier()) + }) + } +}