-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Possible Congestion Scenario in SortPreservingMergeExec
#12302
Changes from 8 commits
4382945
d14dde3
1567c0c
02cdbfe
eb068a0
93a9c7c
5640da8
07bf172
2e95923
9f32d2b
c8b32a5
4bdbafa
e0bf209
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,19 +18,22 @@ | |
//! Merge that deals with an arbitrary size of streaming inputs. | ||
//! This is an order-preserving merge. | ||
|
||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use std::task::{ready, Context, Poll}; | ||
|
||
use crate::metrics::BaselineMetrics; | ||
use crate::sorts::builder::BatchBuilder; | ||
use crate::sorts::cursor::{Cursor, CursorValues}; | ||
use crate::sorts::stream::PartitionedStream; | ||
use crate::RecordBatchStream; | ||
|
||
use arrow::datatypes::SchemaRef; | ||
use arrow::record_batch::RecordBatch; | ||
use datafusion_common::Result; | ||
use datafusion_execution::memory_pool::MemoryReservation; | ||
|
||
use futures::Stream; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use std::task::{ready, Context, Poll}; | ||
|
||
/// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`] | ||
type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>; | ||
|
@@ -97,6 +100,10 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> { | |
|
||
/// number of rows produced | ||
produced: usize, | ||
|
||
/// Unitiated partitions. They are stored in a vector to keep them in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please document what an "uninitiated partition" means in this context? I think it means partitions whose streams that have been polled haven't been ready yet |
||
/// a priortiy order to visit the partitions in a round-robin fashion | ||
uninitiated_partitions: Vec<usize>, | ||
} | ||
|
||
impl<C: CursorValues> SortPreservingMergeStream<C> { | ||
|
@@ -121,6 +128,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { | |
batch_size, | ||
fetch, | ||
produced: 0, | ||
uninitiated_partitions: (0..stream_count).collect(), | ||
} | ||
} | ||
|
||
|
@@ -156,12 +164,22 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { | |
} | ||
// try to initialize the loser tree | ||
if self.loser_tree.is_empty() { | ||
// Ensure all non-exhausted streams have a cursor from which | ||
// rows can be pulled | ||
for i in 0..self.streams.partitions() { | ||
if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) { | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.aborted = true; | ||
return Poll::Ready(Some(Err(e))); | ||
// Ensure all non-exhausted streams have a cursor from which rows can be pulled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment implies to me that the code would / should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO, the behavior is more correct now. In the previous version, let's assume the 1st partition is exhausted and returns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see synnada-ai#34 for alternate idea There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've tried to explain my concern with that: synnada-ai#34 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 -- response in synnada-ai#34 (comment) |
||
let remaining_partitions = self.uninitiated_partitions.clone(); | ||
for i in remaining_partitions { | ||
match self.maybe_poll_stream(cx, i) { | ||
Poll::Ready(Err(e)) => { | ||
self.aborted = true; | ||
return Poll::Ready(Some(Err(e))); | ||
} | ||
Poll::Pending => { | ||
self.uninitiated_partitions.rotate_left(1); | ||
cx.waker().wake_by_ref(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if this usage has some side-effects or decrease performance, but I cannot wake the SPM poll again once it receives a pending from its first partition There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did some research -- see https://github.com/synnada-ai/datafusion-upstream/pull/34/files#r1743621057 I think calling But I share your concern that this will cause some sort of performance issue |
||
return Poll::Pending; | ||
} | ||
_ => { | ||
self.uninitiated_partitions.retain(|idx| *idx != i); | ||
} | ||
} | ||
} | ||
self.init_loser_tree(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read this test a bit more -- it doesn't seem like it is actually a fuzz test (aka it doesn't seem to have any random inputs, for example).
I think it would make more sense to put it with the other sort preserving merge tests:
datafusion/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Lines 301 to 302 in 6034be4