-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: Reduce message duplicates during node restart #102717
Conversation
Need to run roachprod tests; but would love to get first look... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coordinator, in turn, sends signal to the remaining aggregators to exit
The remaining aggregators on the same node? Or the whole cluster? It's probably not optimal to drain the whole changefeed whenever an individual node restarts, vs. reassigning its spans. And even in a cluster-wide restart, we don't want a quadratic number of node-level drains.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @rhu713, and @yuzefovich)
pkg/ccl/changefeedccl/changefeed_dist.go
line 412 at r1 (raw file):
} var checkpoint jobspb.ChangefeedProgress_Checkpoint
nit: iirc this is potentially a large struct, suggest making this a pointer rather than copying.
pkg/ccl/changefeedccl/changefeed_processors.go
line 1231 at r1 (raw file):
// that the aggregator exited due to node shutdown. Transition to // draining so that the remaining aggregators will shut down and // transmit their up-to-date frontier.
Okay, so this does read like the intended flow is "any aggregator node gets the draining warning and checkpoints -> coordinator frontier gets the draining warning and forwards it to all nodes -> all nodes checkpoint -> changefeed restarts". That seems like it'd mean that during a rolling restart of an n-node cluster, we restart the changefeed O(n) times, depending on whether the changefeed is able to replan faster than the cadence of the restarts? Doesn't each node that's being restarted get its own signal anyway?
Instead, I'd think we could send the span partitions and updated checkpoints from just this node to a new changefeed aggregator processor, placed using distSQL planning or maybe on the "youngest" node we know about (i.e. the one with the most recent drain time that has heartbeated afterwards)?
Coordinator shuts down all aggregators. It may not be optimal, but that's exactly what happens when node shuts down -- the whole flow is torn down. I'm not sure where you see quadratic behavior -- there isn't; when the node drains, changefeed restarts, and it avoids the draining node when replanning.
That's correct, we restart changefeed N times, every time making forward progerss. This depends, as you say above on the rate of node drains. This rate is controllable by the customer -- and the chosen default (10 seconds) is probably enough.
I disagree. First, there is no mechanism that exists right now to reassign partitions to existing processors (as you suggest above). Secondly, there is no mechanism that I know of to add another processor to an existing flow. Perhaps that can be implemented -- I don't know. It would almost certainly be pretty difficult to get right wrt to synchronization in dist flow infrastructure. Perhaps this can be accomplished by modifying changfeed specific code somehow so that changfeeds consists of 1 or more "mini" flows -- so that you can create new flow/processor placement. I was not trying to rewrite entire event management model. Quite the opposite actually. By leveraging metadata, I think I've taken a good, balanced approach: improve situation (I suspect significantly), while not introducing significant, additional complexity. Also note: optimal solution -- that's subjective. I certainly do not claim that duplicates are eliminated -- which would be optimal. All I want is the behavior during rolling restart is similar to how checkpoints work -- duplicates are reduced, significantly, and the changfeed can make forward progress. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure where you see quadratic behavior
In the worst case, if there are N nodes with an aggregator on each node, each aggregator restarts (drains, gets torn down, gets recreated in a new flow) N times, so that's the N^2 I'm talking about--the cluster-wide total CPU used to repeatedly start up processors would still be quadratic in the size of the cluster.
If we're very lucky, the mini flow approach wouldn't be that complicated if we could reuse the physicalplanner.MergePlans function that gets used for UNION ALL
queries. But sounds like a future investigation.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @rhu713, and @yuzefovich)
Not quite -- if you restart all nodes -- you've crashed your cluster, and I'm not too worried about that. So... is it possible that when you restart N times you get changefeed restarted more than once? Yes, it's possible. Does that make sense? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @rhu713, and @yuzefovich)
a discussion (no related file):
👋 Left some small comments. Do we need version gates since you added a new proto message?
-- commits
line 31 at r2:
typos: transmitting, up-to-date
-- commits
line 34 at r2:
typo: persisted
pkg/ccl/changefeedccl/changefeed_dist.go
line 512 at r2 (raw file):
} if len(filtered) == 0 { return partitions // Maybe panic.
Maybe pass *testing.T and make whatever assertions are required here.
pkg/ccl/changefeedccl/changefeed_dist.go
line 554 at r2 (raw file):
w.err = err switch { case errors.Is(err, changefeedbase.ErrNodeDraining):
This looks a bit scary.
pkg/ccl/changefeedccl/changefeed_processors.go
line 556 at r2 (raw file):
for ca.State == execinfra.StateRunning { if !ca.changedRowBuf.IsEmpty() { ca.lastPush = timeutil.Now()
Can you defer this?
pkg/jobs/registry.go
line 152 at r2 (raw file):
// jobs onto a draining node. At the moment, jobs can // access this to make per-job decisions about what to // do.
I will assume that this is already done.
pkg/jobs/registry.go
line 156 at r2 (raw file):
drainJobs chan struct{} drainRequested chan struct{}
nit: a small comment would be nice. We have 3 synchronization primitives right here that have to do with node shutdown with no commentary.
pkg/jobs/registry.go
line 1990 at r2 (raw file):
// by requesting all currently running jobs, as well as various job registry // processes terminate. // WaitForRegistryShutdown can then be used to wait for those tasks to complete.
Stray comment line.
pkg/sql/flowinfra/outbox.go
line 138 at r2 (raw file):
ctx context.Context, row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata, ) error { log.Infof(ctx, "addRow: meta=%v row=%v", meta, row)
This is called often right? Might want to make this a V log.
pkg/server/admin_test.go
line 3026 at r2 (raw file):
defer tc.Stopper().Stop(ctx) serverutils.SetClusterSetting(t, tc, "server.shutdown.jobs_wait", 0)
We should default to 0s in tests. Kind of like jobs short intervals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @jayshrivastava, @rhu713, and @yuzefovich)
a discussion (no related file):
Previously, jayshrivastava (Jayant) wrote…
👋 Left some small comments. Do we need version gates since you added a new proto message?
Good question. In my assessment, you don't need it.
This metadata is never persisted -- so that's good.
We always specified a callback function that received metadata -- we just never used it.
So, few cases to consider:
- Coordinator running old code -- doesn't matter what aggregators do during shutdown, but they will shutdown with ErrNodeRestart, coordinator ignores metadata, and restarts changefeed because all errors are retriable -- no different from today. During this time, we might restart changefeed multiple times (because as soon as we restart, we could restart again cause node is draining) -- but that's also fine because of backoffs. So, no issues here.
- Coordinator running new code
a. Aggregator being drained running old code -- same as number 1 above.
b. Some aggregators running new code. Those aggregators emit metadata, coordinator knows how to handle it. Note: not receiving metadata from all aggregators is okay (and that's exactly what happens in mixed version state) -- the checkpoint logic is additive -- checkpoint applied on top of current checkpoint, and if some aggregators didn't transmit theirs (cause they run old version), the checkpoint for those aggregators will restart from current checkpoint.
I hope I didn't miss cases.
Previously, jayshrivastava (Jayant) wrote…
typos: transmitting, up-to-date
Done.
pkg/ccl/changefeedccl/changefeed_dist.go
line 512 at r2 (raw file):
Previously, jayshrivastava (Jayant) wrote…
Maybe pass *testing.T and make whatever assertions are required here.
I'd rather not pass testing.T to non-testing code.
pkg/ccl/changefeedccl/changefeed_dist.go
line 554 at r2 (raw file):
Previously, jayshrivastava (Jayant) wrote…
This looks a bit scary.
I think it's fine. If we got this error, it means that we propagated this error to aggregators -- that's part of what this PR does. This propagation is a way to shut things down -- and we propagate it to every aggregator.
Cancellation is a way to propagate an error -- cancellation -- when one of the aggregators experienced localized error.
pkg/ccl/changefeedccl/changefeed_processors.go
line 1231 at r1 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
Okay, so this does read like the intended flow is "any aggregator node gets the draining warning and checkpoints -> coordinator frontier gets the draining warning and forwards it to all nodes -> all nodes checkpoint -> changefeed restarts". That seems like it'd mean that during a rolling restart of an n-node cluster, we restart the changefeed O(n) times, depending on whether the changefeed is able to replan faster than the cadence of the restarts? Doesn't each node that's being restarted get its own signal anyway?
Instead, I'd think we could send the span partitions and updated checkpoints from just this node to a new changefeed aggregator processor, placed using distSQL planning or maybe on the "youngest" node we know about (i.e. the one with the most recent drain time that has heartbeated afterwards)?
Done.
pkg/ccl/changefeedccl/changefeed_processors.go
line 556 at r2 (raw file):
Previously, jayshrivastava (Jayant) wrote…
Can you defer this?
Defers in the for loop are not great (and linter complains to that effect too)
pkg/jobs/registry.go
line 152 at r2 (raw file):
Previously, jayshrivastava (Jayant) wrote…
I will assume that this is already done.
Done.
pkg/jobs/registry.go
line 1990 at r2 (raw file):
Previously, jayshrivastava (Jayant) wrote…
Stray comment line.
Why? I wanted to mention WaitForReigstry function here...
pkg/sql/flowinfra/outbox.go
line 138 at r2 (raw file):
Previously, jayshrivastava (Jayant) wrote…
This is called often right? Might want to make this a V log.
not only that'... it's a mistake to include this file! Thanks!
Previously, we forward the most up to date state for all spans in cockroachdb#102717 to reduce duplicates upon changefeed resumes. This can cause unexpected behaviour during initial scans since initial scan logic relies on all resolved timestamps being empty.
Previously, we forward the most up to date state for all spans in cockroachdb#102717 to reduce duplicates upon changefeed resumes. This can cause unexpected behaviour during initial scans since initial scan logic relies on all resolved timestamps being empty.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since cockroachdb@afb95b1, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: cockroachdb#123371 Release note: fixed a bug where long running initial scans may result in dropped events if node / changefeed restarts
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since cockroachdb@afb95b1, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: cockroachdb#123371 Release note (enterprise change): Fixed a bug where long running initial scans may drop events during node / changefeed restart.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since cockroachdb@afb95b1, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: cockroachdb#123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1. changefeed.shutdown_checkpoint.enabled (v23.2) is set 2. Multiple table targets in a changefeed 3. Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since cockroachdb@afb95b1, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: cockroachdb#123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1. changefeed.shutdown_checkpoint.enabled (v23.2) is set 2. Multiple table targets in a changefeed 3. Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since cockroachdb@0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: cockroachdb#123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since cockroachdb@0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: cockroachdb#123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
123625: changefeedccl: fix initial scan checkpointing r=andyyang890,rharding6373 a=wenyihu6 Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since 0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR #102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: #123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold. Co-authored-by: Wenyi Hu <[email protected]>
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since 0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR #102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: #123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since 0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR #102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: #123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since 0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR #102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: #123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since cockroachdb@0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR cockroachdb#102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: cockroachdb#123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Initially, all span initial resolved timestamps are kept as zero upon resuming a job since initial resolved timestamps are set as initial high water which remains zero until initial scan is completed. However, since 0eda540, we began reloading checkpoint timestamps instead of setting them all to zero at the start. In PR #102717, we introduced a mechanism to reduce message duplicates by re-loading job progress upon resuming which largely increased the likelihood of this bug. These errors could lead to incorrect frontier and missing events during initial scans. This patches changes how we initialize initial high water and frontier by initializing it as zero if there are any zero initial high water in initial resolved timestamps. Fixes: #123371 Release note (enterprise change): Fixed a bug in v22.2+ where long running initial scans may incorrectly restore checkpoint job progress and drop events during node / changefeed restart. This issue was most likely to occur in clusters with: 1) changefeed.shutdown_checkpoint.enabled (v23.2) is set 2) Multiple table targets in a changefeed, or 3) Low changefeed.frontier_checkpoint_frequency or low changefeed.frontier_highwater_lag_checkpoint_threshold.
Changefeeds have at least once semantics. These semantics are implemnted via reliance on the closed timestamp system, which drives the changefeed checkpointing logic.
When the node is restarted (gracefully) by draining the node, the changefeed will restart, re-emitting messages written since the last successfull checkpoint.
During rolling cluster restarts, however, this behavior results in an almost quadratic behavior with respect to duplicate messages (e.g. a node may be drained every 5 minutes, and the checkpoint maybe produced every 5 minutes -- with each subsequent node being drained right before successfull checkpoint).
This PR addresses the issue of duplicates during node (and cluster) restarts.
First, this information is exposed via
OnDrain
channel made available in jobs registry. This channel is singlaed when the node begins its drain process, and there is a wait period, configured viaserver.shutdown.jobs_wait
setting -- 10 seconds by default -- before the registry will shut down, cancelling all currently running jobs.During this time, the changefeed aggregator running on the node being drained, detects this and shuts down -- transmitting its full frontier to the changfeed coordinator. Coordinator, in turn, sends signal to the remaining aggregators to exit -- also transmissintg their up=to-date frontier information.
Prior to retrying the changefeed, an up-to-date frontier is reconstructed, perssited to the jobs table as needed, and the changefeed flow is replanned, avoiding the node that is being drained.
Epic: CRDB-26978
Release note (enterprise change): Changefeeds emit significantly fewer duplicate messages during node/cluster restarts.