-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: add retries to sinkless changefeeds #85458
changefeedccl: add retries to sinkless changefeeds #85458
Conversation
f9c3255
to
c614d00
Compare
c614d00
to
21599e5
Compare
Note my 2nd commit will be removed prior to merging. It's just there to force sinkless tests in CI. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is beautiful. Only comment is that we need to robustly test my claim that restarting a core changefeed from the schema change boundary that caused the error is always safe (never skips a message). TestChangefeedNemeses (in changefeedccl/nemeses_test.go) does almost that already--I think all you'd need to do is run it on sinkless a few hundred times, and either remove the disableDeclarativeSchemaChangesForTest
or find some other way of ensuring that schema changes cause a retryable error.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very nice.
telemetry.Count(`changefeed.core.error`) | ||
|
||
var err error | ||
for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this logic to a separate function.. coreChangefeedWithRetry or some such.
@@ -1038,7 +1038,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad | |||
// Detect whether this boundary should be used to kill or restart the | |||
// changefeed. | |||
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART { | |||
err = changefeedbase.MarkRetryableError(err) | |||
err = changefeedbase.MarkRetryableErrorWithTimestamp(err, cf.frontier.boundaryTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is great.. and very elegant.. I always have to question myself if we need ts.Next() or not.. I don't think we do.
@@ -1182,7 +1182,7 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) { | |||
}) | |||
} | |||
|
|||
cdcTest(t, testFn, feedTestOmitSinks("sinkless")) | |||
cdcTest(t, testFn, feedTestForceSink("sinkless")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to forceSink.. We want to run this test against all available sinks.
I think the logic is correct; I would recommend running this and other tests under stress...
./dev test pkg/ccl/changefeedccl --filter=TestChangefeedBackfillCheckpoint --stress
(Extra bonus if you decide to setup GCE worker, so that you can build run roachprod stress -- that's probably an overkill as long as you run at least few hundred iterations and verify that you've ran sinkless sufficient number of times).
15444c7
to
2e1d4d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @miretskiy)
a discussion (no related file):
@HonoreDB I ran the nemeses 200 times on my gce worker and it had no failure. This is the script I used:
import subprocess
for i in range(200):
res = subprocess.run('./dev test pkg/ccl/changefeedccl --filter=TestChangefeedNemeses --stream-output -- --nocache_test_results', shell=True)
assert res.returncode == 0
The latest commit shows the nemeses code I ran, which basically forces sinkless feeds and makes them restart when schema changes happen.
cc @miretskiy I believe this addresses your comment regarding stress testing below because the nemeses test is rigorous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @jayshrivastava, and @miretskiy)
a discussion (no related file):
Previously, jayshrivastava wrote…
@HonoreDB I ran the nemeses 200 times on my gce worker and it had no failure. This is the script I used:
import subprocess for i in range(200): res = subprocess.run('./dev test pkg/ccl/changefeedccl --filter=TestChangefeedNemeses --stream-output -- --nocache_test_results', shell=True) assert res.returncode == 0
The latest commit shows the nemeses code I ran, which basically forces sinkless feeds and makes them restart when schema changes happen.
cc @miretskiy I believe this addresses your comment regarding stress testing below because the nemeses test is rigorous.
Update: Going to run it 10k times overnight to verify things work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved (pending those test runs and removing the two temp commits).
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @miretskiy)
2e1d4d8
to
f05d3c8
Compare
Previously, core changefeeds would stop entirely due to transient errors or certain schema changes. This change adds a retry loop to the core changefeed distributed SQL workflow. This change updates related tests which omitted sinkless feeds since they could not handle schema changes. Fixes cockroachdb#85008 Release note (general change): Changefeeds without a specified sink will not longer terminate when schema changes occur.
f05d3c8
to
3b40478
Compare
bors r+ |
Build succeeded: |
This change updates core/sinkless changefeeds to run in a retry loop, allowing for changefeed restarts in case of transient errors or declarative schema changes.
See commit notes for more details.
Fixes #85008