-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: flume channels for RepartitionExec #6929
Conversation
I can replicate the resuls:
|
#[derive(Debug)] | ||
pub(super) struct DistributionSender<T>(Sender<T>); | ||
|
||
impl<T> Clone for DistributionSender<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can modify the existing DistributionReceiver
/DistributionSender
instead of creating a new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, this PR does not want to modify the existing code. It adds new code in the new module. When the flume code is approved by the community, we could remove the old distribution_channel.rs
directly.
//! Channel based on flume | ||
|
||
use flume::r#async::RecvStream; | ||
use flume::{unbounded, Sender}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the purpose of the existing code is that preferably we would not using unbounded channels (to avoid high memory usage).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think @crepererum did some work on this recently. It'd be good to get a review from him.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we see the same performance improvement with "just" the default unbounded tokio mspc? i.e. performance doesn't improve because of flume, but just because we switch to unbounded buffering here
@YjyJeff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Dandandan Good point. I have compared the flume channel with the tokio channel on tpch. Here is the result:
Comparing main and feature_tokio_unbounded
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ feature_tokio_unbounded ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 317.52ms │ 309.05ms │ no change │
│ QQuery 2 │ 73.18ms │ 67.94ms │ +1.08x faster │
│ QQuery 3 │ 136.38ms │ 114.46ms │ +1.19x faster │
│ QQuery 4 │ 84.27ms │ 51.39ms │ +1.64x faster │
│ QQuery 5 │ 170.56ms │ 121.67ms │ +1.40x faster │
│ QQuery 6 │ 83.52ms │ 83.22ms │ no change │
│ QQuery 7 │ 249.60ms │ 220.18ms │ +1.13x faster │
│ QQuery 8 │ 191.66ms │ 175.75ms │ +1.09x faster │
│ QQuery 9 │ 282.38ms │ 215.22ms │ +1.31x faster │
│ QQuery 10 │ 230.92ms │ 152.78ms │ +1.51x faster │
│ QQuery 11 │ 52.68ms │ 56.47ms │ 1.07x slower │
│ QQuery 12 │ 153.50ms │ 120.02ms │ +1.28x faster │
│ QQuery 13 │ 314.86ms │ 309.84ms │ no change │
│ QQuery 14 │ 115.02ms │ 114.93ms │ no change │
│ QQuery 15 │ 90.32ms │ 93.55ms │ no change │
│ QQuery 16 │ 67.44ms │ 62.28ms │ +1.08x faster │
│ QQuery 17 │ 785.40ms │ 763.14ms │ no change │
│ QQuery 18 │ 636.27ms │ 559.21ms │ +1.14x faster │
│ QQuery 19 │ 232.26ms │ 231.57ms │ no change │
│ QQuery 20 │ 261.95ms │ 247.18ms │ +1.06x faster │
│ QQuery 21 │ 351.81ms │ 247.46ms │ +1.42x faster │
│ QQuery 22 │ 54.88ms │ 48.02ms │ +1.14x faster │
└──────────────┴──────────┴─────────────────────────┴───────────────┘
Comparing feature_flume and feature_tokio_unbounded
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query ┃ feature_flume ┃ feature_tokio_unbounded ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1 │ 317.86ms │ 309.05ms │ no change │
│ QQuery 2 │ 70.41ms │ 67.94ms │ no change │
│ QQuery 3 │ 113.01ms │ 114.46ms │ no change │
│ QQuery 4 │ 51.30ms │ 51.39ms │ no change │
│ QQuery 5 │ 123.28ms │ 121.67ms │ no change │
│ QQuery 6 │ 81.93ms │ 83.22ms │ no change │
│ QQuery 7 │ 220.84ms │ 220.18ms │ no change │
│ QQuery 8 │ 175.73ms │ 175.75ms │ no change │
│ QQuery 9 │ 213.37ms │ 215.22ms │ no change │
│ QQuery 10 │ 153.20ms │ 152.78ms │ no change │
│ QQuery 11 │ 54.10ms │ 56.47ms │ no change │
│ QQuery 12 │ 119.72ms │ 120.02ms │ no change │
│ QQuery 13 │ 313.01ms │ 309.84ms │ no change │
│ QQuery 14 │ 115.82ms │ 114.93ms │ no change │
│ QQuery 15 │ 89.26ms │ 93.55ms │ no change │
│ QQuery 16 │ 61.57ms │ 62.28ms │ no change │
│ QQuery 17 │ 786.18ms │ 763.14ms │ no change │
│ QQuery 18 │ 491.24ms │ 559.21ms │ 1.14x slower │
│ QQuery 19 │ 231.82ms │ 231.57ms │ no change │
│ QQuery 20 │ 240.57ms │ 247.18ms │ no change │
│ QQuery 21 │ 239.96ms │ 247.46ms │ no change │
│ QQuery 22 │ 49.39ms │ 48.02ms │ no change │
└──────────────┴───────────────┴─────────────────────────┴──────────────┘
From the above result, we can see that
- changing the custom channel to
tokio::mpsc
can also improve the performance a lot - flume is more efficient in one query
To reproduce the result, you could find the code here.
In my view, avoiding high memory usage is good. But we should not sacrifice the performance 0.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks for checking @YjyJeff !
Yeah I agree on your point. It would be good if we can recover most of the performance.
Maybe we can introduce some (bounded) buffering again and see if we can get an acceptable trade-off?
FYI @crepererum
If it turns out that bounding memory usage inevitably reduces performance in a non-negligible way, I propose we introduce a configuration flag to control this. We can use the high-performance/unbounded behavior the default one, but one should still be able to choose the lower performance/bounded version for memory conscious use cases. |
Sounds reasonable to me like a simple solution. Slightly more complex: track the memory usage and maybe reserve up to 10-100MB for buffering (configurable). I guess at some point much more buffering won't really help anymore for performance. |
I don't think we should ever be using unbounded memory ever if we can avoid it -- in this case if the producer goes faster than the consumer it will just buffer a huge amount of data (and eg will eventually OOM with TPCH SF100, or SF1000) I like @Dandandan 's suggestion to introduce more (but not unbounded) buffering Perhaps we could extend the existing DistributionSender to have a queue (2 or 3 for example) rather than just a single |
I agree, if we can avoid it without paying a noticeable penalty, we definitely should. Let's explore if we can. We can discuss later on what to do if it turns out we can't. |
I did some reading on this and it seems like the extra buffering approach could indeed be fruitful. I think we can land near a "sweet spot" in terms of the cost/benefit trade-off w.r.t. peak memory usage vs. performance. Losing a small amount of performance in extreme cases (which would have had impractical peak memory usages had we used an unbounded channel anyway) is a small price to pay for having the backpressure mechanism in place and always avoiding OOM. |
I will file a ticket describing the ideas in this thread, probably tomorrow |
@YjyJeff Please read the reasoning here before experimenting with unbounded constructs: Mainly, the reason that we are NOT using unbounded channels is that this just buffers large (potentially unbounded) quantities of data in many scenarios. Mostly this happens when the repartition producer side (e.g. trivial data reading like highly compressed parquet files) is way faster than the consumer side (e.g. complicated transforms or costly aggregations). Note that I'm NOT saying that the current impl. is good (or even perfect), there are likley better options. But just slapping an unbounded channel on this problem is not going to solve it. Sure that wins in some micro-benchmarking but it fails to provide a robust foundation for query execution.1 I agree w/ @ozankabak & Co though that SOME buffering is OK. So I think the following config option would be robust, reasonably fast and unsurprising to the user: A option "repartition buffer bytes OR messages" (or similar name) that limits bytes or messages per channel (not per repartition, otherwise the cross-comm overhead is too high) and only if this limit is met we fall back to the cross-channel gating behavior described in the code comment linked above (namely: we let data flow freely if there's at least 1 empty channel). Footnotes
|
I wrote up a somewhat long ticket about my thoughts on this issue here: #7001 I think there are some buffering / adaptivity tricks we could use that might result in some non trivial wins (the "adaptive repartition one in particular I am excited about") |
Marking as draft as we come up with a plan |
Which issue does this PR close?
Closes #6928 #6928 (comment)
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?
No