Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(7181): cascading loser tree merges #7379

Closed
wants to merge 30 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5eaaeec
refactor(7181): move the management of input data, and output data, t…
wiedld Aug 15, 2023
a3870d0
feat(7181): add cursor.seek()
wiedld Aug 15, 2023
e54e92c
refactor(7181): move streaming_merge() into separate mod from the mer…
wiedld Aug 15, 2023
3d43e97
feat(7181): streaming_merge() consumes SortPreservingCascadeStream
wiedld Aug 15, 2023
1a6a364
feat(7181): change BatchBuilder to be a SortOrder Builder, with the S…
wiedld Aug 15, 2023
28454c5
feat(7181): add slice() to Cursor trait
wiedld Aug 17, 2023
b766712
feat(7181): make SortOrderBuilder yield in a stream.
wiedld Aug 17, 2023
9ff37f3
feat(7181): enable the ability to create a multi-layer merge cascade.
wiedld Aug 22, 2023
eb647ea
feat(7181): build multiple-level cascade tree.
wiedld Aug 22, 2023
8cd22a0
feat(7181): use RecordBatch tracking to avoid expensive slicing of ba…
wiedld Aug 28, 2023
173577b
fix(7181): improve performance by using hasher on tuple (unqiue slice…
wiedld Aug 29, 2023
b0f1402
chore(7181): make a zero-cost BatchId type, for more explicit code
wiedld Aug 31, 2023
9ea3a65
refactor: comment the major streaming structures, and how they are in…
wiedld Aug 31, 2023
7be30c2
refactor: use u64 as batch_id in cascading merge sort
wiedld Sep 1, 2023
0e9573d
feat(7181): convert into generic ReceiverStream, such that can be reu…
wiedld Sep 2, 2023
c439138
feat(7181): add buffered multithreading to merge streams
wiedld Sep 2, 2023
fca522b
test(7181): have sort preserving merge tests, run in both single thre…
wiedld Sep 3, 2023
50c8636
chore: TMP COMMIT pointing at arrow-rs branch, for CI pipeline
wiedld Sep 3, 2023
cfa32fa
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 13, 2023
d520496
chore: clippy and linter
wiedld Sep 13, 2023
a324ef8
fix(7181): have RowCursor slicing be within the a single arc-refed Rows
wiedld Sep 15, 2023
d3613bd
feat(7181): have BatchCursor be the primary struct passed around
wiedld Sep 15, 2023
3786021
feat(7181): update documentation for the cascaded merge
wiedld Sep 15, 2023
2932bd5
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 15, 2023
8701220
fix: add apache license header to new mods
wiedld Sep 15, 2023
828a5d1
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 19, 2023
0dfc60c
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 19, 2023
e642420
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Oct 5, 2023
f97cc4d
feat(7181): remove mutex around polled stream.
wiedld Sep 19, 2023
9b10198
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: comment the major streaming structures, and how they are in…
…ter-used. Add missing debug fmt.
wiedld committed Sep 3, 2023
commit 9ea3a65731d9abe92b77a8c05003c72fcf84712f
15 changes: 13 additions & 2 deletions datafusion/core/src/physical_plan/sorts/builder.rs
Original file line number Diff line number Diff line change
@@ -157,9 +157,20 @@ impl<C: Cursor> SortOrderBuilder<C> {
}
}

/// Takes the batches which already are sorted, and returns them with the corresponding cursors and sort order
/// Takes the batches which already are sorted, and returns them with the corresponding cursors and sort order.
///
/// This will drain the internal state of the builder, and return `None` if there are no pending
/// This will drain the internal state of the builder, and return `None` if there are no pending.
///
/// This slices cursors for each record batch, as follows:
/// 1. input was N record_batchs of up to max M size
/// 2. yielded ordered rows can only equal up to M size
/// 3. of the N record_batches, each will be:
/// a. fully yielded (all rows)
/// b. partially yielded (some rows) => slice cursor, and adjust BatchOffset
/// c. not yielded (no rows) => retain cursor
/// 4. output will be:
/// - SortOrder
/// - corresponding cursors, each up to total yielded rows [cursor_batch_0, cursor_batch_1, ..]
pub fn yield_sort_order(
&mut self,
) -> Result<Option<(Vec<(C, BatchId, BatchOffset)>, Vec<SortOrder>)>> {
2 changes: 0 additions & 2 deletions datafusion/core/src/physical_plan/sorts/cascade.rs
Original file line number Diff line number Diff line change
@@ -48,8 +48,6 @@ impl<C: Cursor + Unpin + Send + 'static> SortPreservingCascadeStream<C> {
) -> Self {
let stream_count = streams.partitions();

// TODO: We can do slicing followed by concating of Cursors yielded from each merge.
// Refer to YieldedCursorStream for where the concat would happen (TODO).
let streams = Arc::new(parking_lot::Mutex::new(BatchTrackingStream::new(
streams,
reservation.new_empty(),
76 changes: 66 additions & 10 deletions datafusion/core/src/physical_plan/sorts/stream.rs
Original file line number Diff line number Diff line change
@@ -34,20 +34,33 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

/// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`]
/// A fallible [`PartitionedStream`] of record batches.
///
/// Each [`Cursor`] and [`RecordBatch`] represents a single record batch.
pub(crate) type BatchCursorStream<C> =
Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>;

pub type BatchId = Uuid;

#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct BatchOffset(pub usize); // offset into a batch, used when a cursor is sliced
pub struct BatchOffset(pub usize);

/// A fallible [`PartitionedStream`] of [`Cursor`] and a batch identifier (Uuid)
/// A [`PartitionedStream`] representing partial record batches.
///
/// Each ([`Cursor`], [`BatchId`], [`BatchOffset`]) represents part of a record batch
/// with the cursor.row_idx=0 representing the normalized key for the row at batch[idx=BatchOffset].
///
/// Each merge node (a `SortPreservingMergeStream` loser tree) will consume a [`CursorStream`].
pub(crate) type CursorStream<C> =
Box<dyn PartitionedStream<Output = Result<(C, BatchId, BatchOffset)>>>;

/// A stream of yielded [`SortOrder`]s, with the corresponding [`Cursor`]s and [`Uuid`]s
/// A fallible stream of yielded [`SortOrder`]s is a [`MergeStream`].
///
/// Within a cascade of merge nodes, (each node being a `SortPreservingMergeStream` loser tree),
/// the merge node will yield a SortOrder as well as any partial record batches from the SortOrder.
///
/// [`YieldedCursorStream`] then converts an output [`MergeStream`]
/// into an input [`CursorStream`] for the next merge.
pub(crate) type MergeStream<C> = std::pin::Pin<
Box<
dyn Send
@@ -236,9 +249,13 @@ impl<T: FieldArray> PartitionedStream for FieldCursorStream<T> {
}
}

/// A wrapper around [`CursorStream<C>`] that implements [`PartitionedStream`]
/// and provides polling of a subset of the streams.
/// A wrapper around [`CursorStream`] that provides polling of a subset of the partitioned streams.
///
/// This is used in the leaf nodes of the cascading merge tree.
/// To have the same [`CursorStream`] (with the same RowConverter)
/// be separately polled by multiple leaf nodes.
pub struct OffsetCursorStream<C: Cursor> {
// Input streams. [`BatchTrackingStream`] is a [`CursorStream`].
streams: Arc<Mutex<BatchTrackingStream<C>>>,
offset: usize,
limit: usize,
@@ -287,6 +304,10 @@ impl<C: Cursor> std::fmt::Debug for OffsetCursorStream<C> {
}
}

/// Converts a [`BatchCursorStream`] into a [`CursorStream`].
///
/// While storing the record batches outside of the cascading merge tree.
/// Should be used with a Mutex.
pub struct BatchTrackingStream<C: Cursor> {
/// Write once, read many [`RecordBatch`]s
batches: HashMap<BatchId, Arc<RecordBatch>, RandomState>,
@@ -317,6 +338,14 @@ impl<C: Cursor> BatchTrackingStream<C> {
self.batches.remove(id);
}
}
}

impl<C: Cursor> PartitionedStream for BatchTrackingStream<C> {
type Output = Result<(C, BatchId, BatchOffset)>;

fn partitions(&self) -> usize {
self.streams.partitions()
}

fn poll_next(
&mut self,
@@ -334,6 +363,14 @@ impl<C: Cursor> BatchTrackingStream<C> {
}
}

impl<C: Cursor> std::fmt::Debug for BatchTrackingStream<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BatchTrackingStream")
.field("num_partitions", &self.partitions())
.finish()
}
}

/// A newtype wrapper around a set of fused [`MergeStream`]
/// that implements debug, and skips over empty inner poll results
struct FusedMergeStreams<C>(Vec<Fuse<MergeStream<C>>>);
@@ -353,8 +390,16 @@ impl<C: Cursor> FusedMergeStreams<C> {
}
}

impl<C: Cursor> std::fmt::Debug for FusedMergeStreams<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FusedMergeStreams").finish()
}
}

/// [`YieldedCursorStream`] converts an output [`MergeStream`]
/// into an input [`CursorStream`] for the next merge.
pub struct YieldedCursorStream<C: Cursor> {
// Inner polled batch cursors, per stream_idx, which represent partially yielded batches.
// Inner polled batch cursors, per stream_idx, which represents partially yielded batches.
cursors: Vec<Option<VecDeque<(C, BatchId, BatchOffset)>>>,
/// Streams being polled
streams: FusedMergeStreams<C>,
@@ -379,9 +424,20 @@ impl<C: Cursor + std::marker::Send> YieldedCursorStream<C> {
.flatten()
}

// TODO: in order to handle sort_order, we need to either:
// parse further
// or concat the cursors
// The input [`SortOrder`] is across batches.
// We need to further parse the cursors into smaller batches.
//
// Input:
// - sort_order: Vec<(BatchId, row_idx)> = [(0,0), (0,1), (1,0), (0,2), (0,3)]
// - cursors: Vec<(C, BatchId, BatchOffset)> = [cursor_0, cursor_1]
//
// Output stream:
// Needs to be yielded to the next merge in three partial batches:
// [(0,0),(0,1)] with cursor => then [(1,0)] with cursor => then [(0,2),(0,3)] with cursor
//
// This additional parsing is only required when streaming into another merge node,
// and not required when yielding to the final interleave step.
// (Performance slightly decreases when doing this additional parsing for all SortOrderBuilder yields.)
fn try_parse_batches(
&mut self,
stream_idx: usize,