-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: improve repartition buffering #4867
Conversation
2c03869
to
c5ca9fa
Compare
I'm sure this doesn't hold in certain cases. Mainly for hash repartition, when cardinality is low, for example lower than the number of target partitions, part of them will not receive any data. |
Another "problematic" case is when some column/expression part of a hash repartition has low cardinality and data is sorted or semi-sorted (i.e. a same value is repeated many times before moving to a next one), in this case the buffer will fill up until a next value is consumed and it's only bounded until at least all partitions have been filled (practically never). It is still better than the current situation though for other situations. |
I agree w/ @Dandandan's assessment. |
I plan to review this tomorrow if I can find the time; If not I will certainly do so on Thursday |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments, need to find some focus time to check the futures shenanigans. I think it might be possible to do something simpler using either Semaphore or Notify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nicely done @crepererum -- I reviewed the structure, comments, code and tests and they are all very nice 🏆
I left a few comments but I don't think they are required to merge.
I also double checked that the mutex acquisition order was consistent (so as to avoid deadlocks)
I also sanity checked performance on TPCH q` which has a repartition and it appears to be the same (as expected) 👍
(arrow_dev) alamb@MacBook-Pro-8:~/Software/arrow-datafusion2$ CARGO_TARGET_DIR=/Users/alamb/Software/target-df2 cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path /tmp/tpch-parquet --format parquet --query 1
Finished release [optimized] target(s) in 0.26s
Running `/Users/alamb/Software/target-df2/release/tpch benchmark datafusion --iterations 3 --path /tmp/tpch-parquet --format parquet --query 1`
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(1), debug: false, iterations: 3, partitions: 2, batch_size: 8192, path: "/tmp/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 1 iteration 0 took 876.7 ms and returned 4 rows
Query 1 iteration 1 took 821.2 ms and returned 4 rows
Query 1 iteration 2 took 880.4 ms and returned 4 rows
Query 1 avg time: 859.43 ms
At alamb@MacBook-Pro-8:~/Software/arrow-datafusion2$ git checkout 664edea4ec78114e8335a05a0e0dfa06a0d223b9
(arrow_dev) alamb@MacBook-Pro-8:~/Software/arrow-datafusion2$ CARGO_TARGET_DIR=/Users/alamb/Software/target-df2 cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path /tmp/tpch-parquet --format parquet --query 1
Finished release [optimized] target(s) in 0.29s
Running /Users/alamb/Software/target-df2/release/tpch benchmark datafusion --iterations 3 --path /tmp/tpch-parquet --format parquet --query 1
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(1), debug: false, iterations: 3, partitions: 2, batch_size: 8192, path: "/tmp/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 1 iteration 0 took 879.8 ms and returned 4 rows
Query 1 iteration 1 took 836.9 ms and returned 4 rows
Query 1 iteration 2 took 831.4 ms and returned 4 rows
Query 1 avg time: 849.37 ms
datafusion/core/src/physical_plan/repartition/distributor_channels.rs
Outdated
Show resolved
Hide resolved
datafusion/core/src/physical_plan/repartition/distributor_channels.rs
Outdated
Show resolved
Hide resolved
// Note: n_senders check is here so we don't double-clear the signal | ||
if guard_channel.data.is_empty() && (guard_channel.n_senders > 0) { | ||
// channel is gone, so we need to clear our signal | ||
guard_gate.empty_channels -= 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see there is a corresponding check in drop for DistributionSender which I think is ok. It took me some time to figure see that the accounting was needed in both places to keep the empty_channels accounting correct even if the sender or receiver side was dropped while there were other inputs outstanding
I wonder if it would be easier to understand if the RecvFuture checked if the number of senders was greater than 0 before incrementing empty_channels and then the decrementing of empty_channels could be done only in DistributionReceiver::drop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your proposal relies on the recv side to be dropped, which is technically suboptimal. You may wanna close the gate (for the remaining channels) if you know that your single empty channel cannot receive any data anymore. This is tested in test_close_channel_by_dropping_tx
(see comment at the end saying channel closed => also close gate
).
let (txs, mut rxs) = channels(2); | ||
|
||
poll_ready(&mut txs[0].send("0_a")).unwrap(); | ||
poll_ready(&mut txs[0].send("0_b")).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason a second message can be sent to txs[0]
is because there is no data yet in rxs[1]
, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, added code comments
assert!(waker0.woken()); | ||
assert!(!waker1.woken()); | ||
assert_eq!(poll_ready(&mut send_fut0), Err(SendError("0_b")),); | ||
poll_pending(&mut send_fut1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth then receiving on rx1 and verifying it still gets "1_b"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really see what this gives us (I'm not a massive fan of testing every single interaction in every single test), but since it's a tiny addition, I've added it anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking it would provide coverage that even after one channel is closed then other non closed channels can continue to receive previously buffered things and don't get shut down early)
} | ||
|
||
#[test] | ||
fn test_close_channel_by_dropping_tx() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend another test to add additional coverage to the drop accounting:
3 channels
- fill them all with 1 message
- drop one of the recivers (not senders)
- recv message from one receiver
- try to send on all senders again (1 should be pending, one should be ready(error), one should be ready(ok))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added test_close_channel_by_dropping_rx_on_closed_gate
(although the order of actions in step 4 is slightly different, you get a "pending" state after filling up the channel that was emptied in step 3).
99c7090
to
656f9b6
Compare
Co-authored-by: Andrew Lamb <[email protected]>
c393931
to
632a5a8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great -- thank you @crepererum
assert!(waker0.woken()); | ||
assert!(!waker1.woken()); | ||
assert_eq!(poll_ready(&mut send_fut0), Err(SendError("0_b")),); | ||
poll_pending(&mut send_fut1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking it would provide coverage that even after one channel is closed then other non closed channels can continue to receive previously buffered things and don't get shut down early)
Benchmark runs are scheduled for baseline = a1d39ba and contender = a9ddcd3. a9ddcd3 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Looks like this PR has introduced uncontrolled memory consumption 🤔 |
This reverts commit a9ddcd3
I will look into this report later today -- thank you |
My measurements appear to show that DataFusion 17.0.0 is actually better than DataFusion 16.0.0 about memory consumption #5108 (comment) |
Which issue does this PR close?
Closes #4865.
Rationale for this change
The repartition operation had an unbounded buffer. This is not required in all cases and is even counterproductive since it will drive the input nodes to completion while potentially starving the output nodes and while filling up the buffers up to the memory limit (at which point it will just bail out).
What changes are included in this PR?
A somewhat more sophisticated channel construct (distribution channels) that are only unbounded as long as at least one channel is empty. In practice (= for any reasonable repartition config) this will NOT lead to unbounded memory usage since virtually all partitions should eventually receive some data.
Are these changes tested?
Are there any user-facing changes?
Improved scheduling for the repartition operation.