Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: fix initial scan checkpointing #123625

Merged
merged 1 commit into from
May 10, 2024

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented May 4, 2024

Initially, all span initial resolved timestamps are kept as zero upon resuming a
job since initial resolved timestamps are set as initial high water which
remains zero until initial scan is completed. However, since
0eda540,
we began reloading checkpoint timestamps instead of setting them all to zero at
the start. In PR #102717, we introduced a mechanism to reduce message duplicates
by re-loading job progress upon resuming which largely increased the likelihood
of this bug. These errors could lead to incorrect frontier and missing events
during initial scans. This patches changes how we initialize initial high water
and frontier by initializing it as zero if there are any zero initial high water
in initial resolved timestamps.

Fixes: #123371

Release note (enterprise change): Fixed a bug in v22.2+ where long running
initial scans may incorrectly restore checkpoint job progress and drop events
during node / changefeed restart. This issue was most likely to occur in
clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2)
Multiple table targets in a changefeed, or 3) Low
changefeed.frontier_checkpoint_frequency or low
changefeed.frontier_highwater_lag_checkpoint_threshold.

Copy link

blathers-crl bot commented May 4, 2024

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/ccl/changefeedccl/changefeed_stmt.go line 1375 at r1 (raw file):

	// local state; instead, this checkpoint is applied to the reloaded job
	// progress, and the resulting progress record persisted back to the jobs
	// table.

Could you update the comment to explain why we should return early if the local state's hwm is empty?

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/ccl/changefeedccl/changefeed_stmt.go line 1351 at r1 (raw file):

// reconcileJobStateWithLocalState ensures that the job progress information
// is consistent with the state present in the local state.
func reconcileJobStateWithLocalState(

It would be really great if you were able to make a unit test for reconciliation that exercised your fix.

@andyyang890 andyyang890 requested a review from rharding6373 May 8, 2024 16:26
Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been doing some code reading and I agree with Steven's suggestion to move the fix to the for loop in setupSpansAndFrontier. As you've mentioned before, there is some non-deterministic behavior here depending on the ordering of the empty timestamps in the watched spans slice and I don't think that was intentional. It seems the intent of that for loop was that initialHighwater should be the minimum InitialResolved of all the watched spans.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @wenyihu6)

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented May 8, 2024

I agree with you and Steven that this could be another possible fix as far as I can tell. But I still want to look into more to see how it plays along with other changefeeds and backfill since this is legacy code from

for _, watch := range spec.Watches {
if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) {
initialHighWater = watch.InitialResolved
}
}
.

Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to my previous comment, the effect of your current proposed change would make it so that we don't use the saved shutdown checkpoint data (controlled by changefeed.shutdown_checkpoint.enabled) during initial scans (i.e. not reduce duplicates during initial scan restarts), but I don't think we need to be that conservative. We just need to make sure that we don't set the aggregator frontier's initial highwater to a non-empty timestamp when it should be empty.

But I still want to look into more to see how it plays along with other changefeeds and backfill since this is legacy code from

legacy code can be wrong too 😛

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @wenyihu6)

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/ccl/changefeedccl/changefeed_processors.go line 525 at r2 (raw file):

// used to filter out some previously emitted rows, and by the cloudStorageSink
// to name its output files in lexicographically monotonic fashion.
func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) {

It would still be really nice to have a unit test to exercise this new behavior in the aggregator setup.


pkg/ccl/changefeedccl/changefeed_processors.go line 531 at r2 (raw file):

	for _, watch := range ca.spec.Watches {
		spans = append(spans, watch.Span)
		if watch.InitialResolved.IsEmpty() {

optional nit: It may be clearer if we add a if isInitialScan { continue } first. It's a bit difficult to reason about whether we may set the initialHighWater to something else in future iterations.


pkg/ccl/changefeedccl/changefeed_processors.go line 533 at r2 (raw file):

		if watch.InitialResolved.IsEmpty() {
			// Keep initialHighWater as empty if there are any zero InitialResolved
			// timestamp to indicate that initial scan is required.

nit: s/required/ongoing

Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @wenyihu6)


pkg/ccl/changefeedccl/changefeed_processors.go line 525 at r2 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

It would still be really nice to have a unit test to exercise this new behavior in the aggregator setup.

+1, might also be nice if we could modify the cdc/initial-scan-rolling-restart roachtest to work on cloud so it can run during nightly CI

Copy link
Contributor

@asg0451 asg0451 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @rharding6373, and @wenyihu6)


pkg/ccl/changefeedccl/changefeed_processors.go line 527 at r3 (raw file):

func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) {
	var initialHighWater hlc.Timestamp
	isInitialScan := false

nit: this implementation is overcomplicated for the logic it tries to express. assuming i'm right that it's literally just "min", consider an implementation such as:

	spans = make([]roachpb.Span, 0, len(ca.spec.Watches))
	var initialHighWater hlc.Timestamp // initialHighWater is the minimum of all initial resolved timestamps, or zero if none exist.
	for i, watch := range ca.spec.Watches {
		spans = append(spans, watch.Span)
		if i == 0 {
			initialHighWater = watch.InitialResolved
			continue
		}
		if watch.InitialResolved.Less(initialHighWater) {
			initialHighWater = watch.InitialResolved
		}
	}

@wenyihu6 wenyihu6 force-pushed the tryagain branch 2 times, most recently from cece117 to 6c7c4c6 Compare May 9, 2024 19:53
Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @asg0451, and @rharding6373)


pkg/ccl/changefeedccl/changefeed_processors.go line 525 at r2 (raw file):

Previously, andyyang890 (Andy Yang) wrote…

+1, might also be nice if we could modify the cdc/initial-scan-rolling-restart roachtest to work on cloud so it can run during nightly CI

Sounds good. Working on it.


pkg/ccl/changefeedccl/changefeed_processors.go line 531 at r2 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

optional nit: It may be clearer if we add a if isInitialScan { continue } first. It's a bit difficult to reason about whether we may set the initialHighWater to something else in future iterations.

Done.


pkg/ccl/changefeedccl/changefeed_processors.go line 533 at r2 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

nit: s/required/ongoing

Done.


pkg/ccl/changefeedccl/changefeed_processors.go line 527 at r3 (raw file):

Previously, asg0451 (Miles Frankel) wrote…

nit: this implementation is overcomplicated for the logic it tries to express. assuming i'm right that it's literally just "min", consider an implementation such as:

	spans = make([]roachpb.Span, 0, len(ca.spec.Watches))
	var initialHighWater hlc.Timestamp // initialHighWater is the minimum of all initial resolved timestamps, or zero if none exist.
	for i, watch := range ca.spec.Watches {
		spans = append(spans, watch.Span)
		if i == 0 {
			initialHighWater = watch.InitialResolved
			continue
		}
		if watch.InitialResolved.Less(initialHighWater) {
			initialHighWater = watch.InitialResolved
		}
	}

Nice. I would buy that.


pkg/ccl/changefeedccl/changefeed_stmt.go line 1351 at r1 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

It would be really great if you were able to make a unit test for reconciliation that exercised your fix.

Working on the aggregator unit test.


pkg/ccl/changefeedccl/changefeed_stmt.go line 1375 at r1 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Could you update the comment to explain why we should return early if the local state's hwm is empty?

Done.

@wenyihu6
Copy link
Contributor Author

@rharding6373 Do we want the roachtest commit to be in a separate commit so that we can backport this fix more easily?

Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want the roachtest commit to be in a separate commit so that we can backport this fix more easily?

I would be in favor of that given we're adding a unit test in this PR. I made some updates to the roachtest in #123924.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asg0451, @rharding6373, and @wenyihu6)


-- commits line 4 at r9:
I think we might want to update this commit message to reflect that this bug was actually introduced in afb95b1 as a result of using the checkpoint to set InitialResolved.


pkg/ccl/changefeedccl/changefeed_processors.go line 535 at r9 (raw file):

		}
		if watch.InitialResolved.Less(initialHighWater) {
			// Keep initialHighWater as the minimum of all InitialResolved timestamps.

nit: move this comment to above the for loop


pkg/ccl/changefeedccl/changefeed_processors_test.go line 21 at r9 (raw file):

// TestSetupSpansAndFrontier tests that the setupSpansAndFrontier function
// correctly sets up frontier for the changefeed aggregator frontier.
func TestSetupSpansAndFrontier(t *testing.T) {

nice!


pkg/ccl/changefeedccl/changefeed_processors_test.go line 33 at r9 (raw file):

				{
					Span:            roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
					InitialResolved: hlc.Timestamp{},

nit: there's no need to include fields that are the type's zero value


pkg/ccl/changefeedccl/changefeed_processors_test.go line 64 at r9 (raw file):

		},
		{
			name:             "incomplete initial scan",

nit: subtest names should be unique


pkg/ccl/changefeedccl/changefeed_processors_test.go line 101 at r9 (raw file):

	} {
		t.Run(tc.name, func(t *testing.T) {
			mockChangefeedAggregator := &changeAggregator{}

nit: s/mockChangefeedAggregator/changeAggregator

This isn't a mock since you're testing the production struct

@wenyihu6 wenyihu6 force-pushed the tryagain branch 2 times, most recently from 0b1a0f3 to df496e5 Compare May 10, 2024 12:54
@rharding6373 rharding6373 requested a review from andyyang890 May 10, 2024 13:52
Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: Yes, agree that the roachtest update can be merged separately. Nice work on this, love the new test coverage!

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @andyyang890, @asg0451, and @wenyihu6)


-- commits line 4 at r11:
Reopened this issue, so we can add Fixes: #[123371](https://github.com/cockroachdb/cockroach/issues/123371)


-- commits line 4 at r11:
Please also add a release note.


pkg/ccl/changefeedccl/changefeed_processors_test.go line 21 at r9 (raw file):

Previously, andyyang890 (Andy Yang) wrote…

nice!

+1 nice test Wenyi!


pkg/ccl/changefeedccl/changefeed_processors_test.go line 46 at r11 (raw file):

		},
		{
			name:             "incomplete initial scan with non-empty initial resolved in the end",

test suggestion: add one more test with only one empty initial resolved timestamp in the first position.

@wenyihu6 wenyihu6 marked this pull request as ready for review May 10, 2024 14:12
@wenyihu6 wenyihu6 requested a review from a team as a code owner May 10, 2024 14:12
Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @asg0451)

Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @asg0451, @rharding6373, and @wenyihu6)


-- commits line 21 at r15:
Thanks for adding more details!

nit: the numbered list might be readable if you do 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set, 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.

@wenyihu6 wenyihu6 force-pushed the tryagain branch 3 times, most recently from 5ac778d to 6278275 Compare May 10, 2024 17:42
@wenyihu6
Copy link
Contributor Author

-- commits line 21 at r15:

Previously, andyyang890 (Andy Yang) wrote…

Thanks for adding more details!

nit: the numbered list might be readable if you do 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set, 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.

Done.

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @andyyang890, @asg0451, and @wenyihu6)


pkg/ccl/changefeedccl/changefeed_processors_test.go line 52 at r16 (raw file):

		},
		{
			name:             "incomplete initial scan with non-empty initial resolved in the end",

nit: s/end/middle

Initially, all span initial resolved timestamps are kept as zero upon resuming a
job since initial resolved timestamps are set as initial high water which
remains zero until initial scan is completed. However, since
cockroachdb@0eda540,
we began reloading checkpoint timestamps instead of setting them all to zero at
the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates
by re-loading job progress upon resuming which largely increased the likelihood
of this bug. These errors could lead to incorrect frontier and missing events
during initial scans. This patches changes how we initialize initial high water
and frontier by initializing it as zero if there are any zero initial high water
in initial resolved timestamps.

Fixes: cockroachdb#123371

Release note (enterprise change): Fixed a bug in v22.2+ where long running
initial scans may incorrectly restore checkpoint job progress and drop events
during node / changefeed restart. This issue was most likely to occur in
clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2)
Multiple table targets in a changefeed, or 3) Low
changefeed.frontier_checkpoint_frequency or low
changefeed.frontier_highwater_lag_checkpoint_threshold.
Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @asg0451, @rharding6373, and @wenyihu6)

@wenyihu6
Copy link
Contributor Author

Thanks for the reviews! Nice teamwork 👏

bors r=andyyang890, rharding6373

Copy link

blathers-crl bot commented May 10, 2024

Encountered an error creating backports. Some common things that can go wrong:

  1. The backport branch might have already existed.
  2. There was a merge conflict.
  3. The backport branch contained merge commits.

You might need to create your backport manually using the backport tool.


error creating merge commit from 07845ec to blathers/backport-release-23.1-123625: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict []

you may need to manually resolve merge conflicts with the backport tool.

Backport to branch 23.1.x failed. See errors above.


🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-23.1.x Flags PRs that need to be backported to 23.1 backport-23.2.x Flags PRs that need to be backported to 23.2. backport-24.1.x Flags PRs that need to be backported to 24.1.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

cdc: potentially dropping events during initial scan
5 participants