-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: account for memory in RepartitionExec
#4820
Conversation
@@ -1210,18 +1210,11 @@ mod tests { | |||
let err = common::collect(stream).await.unwrap_err(); | |||
|
|||
// error root cause traversal is a bit complicated, see #4172. | |||
if let DataFusionError::ArrowError(ArrowError::ExternalError(err)) = err { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by clean-up
@@ -555,20 +584,35 @@ impl Stream for RepartitionStream { | |||
mut self: Pin<&mut Self>, | |||
cx: &mut Context<'_>, | |||
) -> Poll<Option<Self::Item>> { | |||
match self.input.poll_next_unpin(cx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change here isn't as large as it seems. I've just converted the implicit loop via (tail) recursion into a proper one since I'm kinda afraid that this may explode under certain circumstances. See one of the early commits in this PR. The actual accounting change is in a separate commit.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @crepererum
In general seeing the unbounded receiver streams here is somewhat terrifying as it will buffer a potentially infinite amount.
Maybe it is worth reviewing this code and ensuring it only buffers inputs when one of the outputs is empty and was polled. 🤔
At least after this PR we will have more visibility into if/when this is happening 👍
@@ -467,11 +488,16 @@ impl RepartitionExec { | |||
}; | |||
|
|||
partitioner.partition(batch, |partition, partitioned| { | |||
let size = partitioned.get_array_memory_size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't account for sliced data (so if a batch of 1M gets cut up into 1000 pieces, each of the 1000 pieces will be charged the entire underlying size). that being said it is a conservative estimate so that is good 👍
I can try that in a follow up. In general unbounded buffering cannot be avoided though since the output may poll partition 1 while the input only produced data for partition 2 til the very end. We can make buffering less likely though (hence let's try this in a follow up). |
I plan to merge this later today unless there are any additional comments |
Benchmark runs are scheduled for baseline = 2db3d2e and contender = 83c1026. 83c1026 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4816.
Rationale for this change
Unbounded channels in
RepartitionExec
may easily lead to OOMs, even when the user has a memory manager limit configured.What changes are included in this PR?
Memory accounting. Some drive-by clean-ups.
Are these changes tested?
New test called
oom
.Are there any user-facing changes?
Better memory accounting.