-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: Do not block on file size based flushes #88395
Conversation
9a4c534
to
3ac7fff
Compare
Prior to this change, cloud storage sink trigger file sized based flush whenever new row would would push the file size beyond configured threshold. This had the effect of singificantly reducing the throughput whenever such event occured -- no additional events could be added to cloud storage sink, while the previus flush was active. This is not necessary. Cloud storage sink can trigger file based flushes asynchronously. The only requirement is that if a real, non file based, flush arrives, or if we need to emit resolved timestamps, then we must wait for all of the active flush requests to complete. In addition, because every event added to cloud sink has associate allocation, which is released when file is written out, performing flushes asynchronously is safe with respect to memory usage and accounting. Release note (enterprise change): Changefeeds, using cloud storage sink, now have better throughput. Release justification: performance fix
@jayshrivastava @ajwerner -- I think based on performance numbers so far, this PR can be reviewedc/merged/backported. |
Tests on a 340G table indicate 2.5x performance impact. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great work. Left minor comments/discussions.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @miretskiy)
pkg/ccl/changefeedccl/sink_cloudstorage.go
line 623 at r1 (raw file):
} } s.asyncFlushActive = asyncFlushEnabled
This code looks good, but we don't explicitly test changing this setting in the middle of operation. There may be some subtle bug. Also, if we use s.asyncFlushActive
here, we can assume it won't change while the changefeed is running. This would make it easier to reason about what's happening in case of any CI failures or in case we want to update this in the future. If you're confident, feel free to keep it.
pkg/ccl/changefeedccl/sink_cloudstorage.go
line 694 at r1 (raw file):
func (s *cloudStorageSink) Close() error { s.files = nil return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close())
I was concerned about waiting for a flush here, but I think this is the right behavior. If a changefeed stops because it's done, we should wait for files to be flushed. If we close due to an error, then there is no harm in flushing what is in flight.
This makes me wonder about the parallel consumer. Do you think it should also wait for in flight events to be emitted to the sink? The change aggregator calls eventConsumer.Close()
then sink.Close()
. Right now, it does not wait for events to be flushed to the sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @jayshrivastava)
pkg/ccl/changefeedccl/sink_cloudstorage.go
line 623 at r1 (raw file):
Previously, jayshrivastava wrote…
This code looks good, but we don't explicitly test changing this setting in the middle of operation. There may be some subtle bug. Also, if we use
s.asyncFlushActive
here, we can assume it won't change while the changefeed is running. This would make it easier to reason about what's happening in case of any CI failures or in case we want to update this in the future. If you're confident, feel free to keep it.
I mean, asyncFlushActive gets assigned the current value of the enable setting.
It's fine if you change this setting -- the fact that I wait above makes sure that the
old async behavior is reverted. I also tested it on the cluster. I could write a test
specifically around that... Not sure if it will be that valuable though.
pkg/ccl/changefeedccl/sink_cloudstorage.go
line 694 at r1 (raw file):
Previously, jayshrivastava wrote…
I was concerned about waiting for a flush here, but I think this is the right behavior. If a changefeed stops because it's done, we should wait for files to be flushed. If we close due to an error, then there is no harm in flushing what is in flight.
This makes me wonder about the parallel consumer. Do you think it should also wait for in flight events to be emitted to the sink? The change aggregator calls
eventConsumer.Close()
thensink.Close()
. Right now, it does not wait for events to be flushed to the sink.
It is definitely the right behavior. Whether it stops because it's done, or because of an error, we have
to wait ; if the context given to this sink was cancelled, then whatever IO performed will be wrapped up; we would return an error (into flushErr) and we would wait and reap go routines.
And it is also the right thing to do if we shut down properly -- w/out an error: we must flush any in progress data.
This flushing stuff is subtle. TLDR: no need to flush.
Two cases to consider:
- We are exiting because of an error. No need to flush in the close method -- just cleanup (i.e. close external storage, wait for go routines, etc).
- Exit because we are done. We could be done because of user action (cancel) -- no need to flush, though I suppose we could. Or we are exiting because of e.g. schema change policy = stop, or perhaps we are running w/ initial scan only. In those cases, we emit resolved span events with EXIT marker. This gets noticed by change aggregator which flushes sink explicitly once its frontier advances -- and then this gets forward to coordinator, and then we exit. That is: there is a flush that's happening anyway.
bors r+ |
Build succeeded: |
I think this is straight up wrong. I think for a given change aggregator we have to wait for an earlier file flush to succeed before starting to flush the later one. |
I think it's a pessimistic assessment. Explicit flushes wait for outstanding. The chances of reordering with multi megabyte files are slim |
Prior to this change, cloud storage sink trigger
file sized based flush whenever new row would
would push the file size beyond configured threshold.
This had the effect of singificantly reducing the throughput whenever such event occured -- no additional events could be added to cloud storage sink, while the previus flush was active.
This is not necessary. Cloud storage sink can trigger file based flushes asynchronously. The only requirement is that if a real, non file based, flush arrives, or if we need to emit resolved timestamps, then we must wait for all of the active flush requests to complete.
In addition, because every event added to cloud sink has associate allocation, which is released when file is written out, performing flushes asynchronously is safe with respect to memory usage and accounting.
Release note (enterprise change): Changefeeds, using cloud storage sink, now have better throughput.
Release justification: performance fix