-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consolidate sort and external_sort #1596
Conversation
struct AggregatedMetricsSet { | ||
intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>, | ||
final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduce AggregatedMetricsSet
for ExternalSortExec
since it may include multi partial sort. Each partial sort itself is SortPreservingMergeStream
and has its own metrics set.
@@ -656,4 +774,187 @@ mod tests { | |||
|
|||
Ok(()) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy all tests from SortExec
here.
self.index += 1; | ||
Some(Ok(self.batches[self.index - 1].as_ref().clone())) | ||
} else { | ||
None | ||
}) | ||
}); | ||
self.baseline_metrics.record_poll(poll) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make metrics right when there is only one record batch.
Thank you @yjshen -- this looks awesome -- I will try and review this carefully, but likely won't have time until tomorrow |
1. [bench]
|
The logic for ExternalSort:
Currently, I've unified all "N-way merge" into The original sort algorithm is to Therefore, I think it is probably worth trying https://github.com/jorgecarleitao/arrow2/blob/main/src/compute/merge_sort/mod.rs or bringing a new memory-efficient sort without combining first. |
Starting to check this out @yjshen -- thank you for the detailed analysis so far |
There is a bunch of other overhead in that column I suspect we could avoid with some additional engineering (follow on PRs) such as the use of an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yjshen 🏅🏅🏅🏅 -- this is really nice. Thank you so much.
The original sort algorithm is to combine_batches first and then sort the single batch. However, this suffers too much extra memory usage for "combine" first, which doubles the memory usage I think is not acceptable.
I would phrase this as "we can improve significantly" and the merge-sort pointer in arrow2 is a great potential place to look;
Given that datafusion master
already this "double memory" usage behavior, I think it is acceptable to keep the same behavior and improve as a follow on PR.
Thus, here is how I suggest we proceed:
Change ExternalSorter
to buffer input batches and then call combine_batches
and sort
if there is sufficient memory (aka do the same thing as master does, so there are no performance regressions) and then merge this code in
As follow on work, we file tickets to:
- Improve the memory efficiency of sorting when no spilling is required (e.g. avoid the doubling required by
combine_batches
, use parallelized merge sort, etc) - Improve the performance of
SortKeyCursor
(e.g. get rid of RWLock, etc) - Add additional tests for when the data actually spills, etc
metrics, | ||
inner_metrics, | ||
used: AtomicUsize::new(0), | ||
spilled_bytes: AtomicUsize::new(0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be neat to add spilled bytes to the metrics reported by ExternalSorter
so that they appeared in EXPLAIN ANALYZE
for example. Perhaps as a follow on task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, filed #1611 to track this.
TPC-H sf=1 sort_extendedprice_discount. [combine_and_sort method]Similar performance for the new sort compared with the previous sort. ./target/release/tpch benchmark datafusion --path ./data --format tbl --query 1 --batch-size 10240 --partitions 1 I changed q1 locally to run directly from tpch program to: select
l_returnflag,
l_linestatus,
l_quantity,
l_extendedprice,
l_discount,
l_tax
from
lineitem
order by
l_extendedprice,
l_discount; query plan:
W/ this PR:
W/o this PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a really nice improvement, happy to see SortPreservingStream being used more 😄
That being said, I think there is a fair bit more async, locking and atomic shenanigans than I would expect to be necessary. Not sure this makes a huge performance impact, but it certainly makes the code harder to reason about.
I've left some comments around the place, other than a deadlock I think these can possibly be addressed in subsequent PRs if you would prefer, up to you 😄
use futures::stream::Stream; | ||
use futures::Future; | ||
use pin_project_lite::pin_project; | ||
use futures::lock::Mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be possible to de-async the stream constructor to allow using parking_lot which is both lighter weight, and avoids the absolute brain melt that are async locks (they're also a monumental pain to debug)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think parking_lot is not used in DataFusion, I once use that but removed it later yjshen@6679628. Do you think we can add this dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I thought it was, std::Mutex then. I more just meant a non-async Mutex 😁
|
||
let mut spills = self.spills.lock().await; | ||
let used = self.used.swap(0, Ordering::SeqCst); | ||
self.spilled_count.fetch_add(1, Ordering::SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW two separate atomic increments with SeqCst are likely slower than a single uncontended mutex
} | ||
|
||
/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. | ||
async fn sort(&self) -> Result<SendableRecordBatchStream> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that as ExternalSorter implements MemoryConsumer directly we need to wrap it in an Arc, but just an observation that the interface and implementation of this component would be simpler if it took mutable references... Can insert_batch
be called after sort
, if so what happens? What about concurrently?
Maybe something to think about, the borrow checker can only help you if you don't go behind its back with Mutex
, RefCell
and similar 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I cannot quite follow here. 🤔 I think although async, rust would compile do_sort
into a state machine, run as the sequence in it?
async fn do_sort(
mut input: SendableRecordBatchStream,
partition_id: usize,
expr: Vec<PhysicalSortExpr>,
metrics: AggregatedMetricsSet,
runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let sorter = Arc::new(ExternalSorter::new(
partition_id,
schema.clone(),
expr,
metrics,
runtime.clone(),
));
runtime.register_consumer(&(sorter.clone() as Arc<dyn MemoryConsumer>));
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
let result = sorter.sort().await;
runtime.drop_consumer(sorter.id());
result
}
Is that possible sort executed before insert_batch()
in the while let? How does concurrently
affect this?
path: String, | ||
schema: SchemaRef, | ||
) -> Result<usize> { | ||
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More as an FYI for @alamb but poking around here IPCWriter
uses dictionary IDs and will error if batches with different IDs are written. This will likely cause problems with the way arrow-rs, and IOx, currently handle dictionaries... I've created apache/arrow-rs#1206 to clarify what is going on here
Thanks @tustvold for the detailed review. I've first quick fix the async and documentation-related comments. For deadlock / mutex / hangup parts, let me think it over again. |
@yjshen is this PR ready for another review, or are you planning more changes to it? |
@alamb I find some of my messages for you are lost, maybe because of my unstable network. Yes, I think the PR is ready for another round of review now.
This means aggregating all metrics during a complex operation, composed of multiple steps, and each step reports its statistics separately. For the sort case here: when the dataset is more significant than available memory, it will report multiple in-mem sort metrics and final merge-sort metrics from |
Thanks @yjshen -- I'll put this on my review queue for first thing tomorrow |
@@ -304,6 +303,9 @@ pub(crate) struct SortPreservingMergeStream { | |||
/// An index to uniquely identify the input stream batch | |||
next_batch_index: usize, | |||
|
|||
/// min heap for record comparison | |||
min_heap: BinaryHeap<Arc<SortKeyCursor>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think while not ideal, we can clean this up in a follow on PR
FYI @houqp @Dandandan @liukun4515 @andygrove -- while @yjshen has done performance analysis and this change should not affect the performance of queries that stay all in memory, there is some possibility of regression. Let us know if you see some and we'll investigate. |
Which issue does this PR close?
Closes #1571
Rationale for this change
We should have only a single sort operator that does in-memory sorting if it has enough memory budget but then spills to disk if needed.
What changes are included in this PR?
in_mem_sort
, buffer batches in memory, and do combine then sort when memory is insufficient and spill.SortExec
intoExternalSort
, retain spill-related tests inExternalSort
.Are there any user-facing changes?
No