Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/datafusion
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: e8049484b1b6856b037f425504a800bd21f5b1fe
Choose a base ref
..
head repository: apache/datafusion
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 59e16934d15e3e507be9b618dee42833ebb59203
Choose a head ref
Showing with 7 additions and 11 deletions.
  1. +7 −11 datafusion/core/src/physical_plan/sorts/sort.rs
18 changes: 7 additions & 11 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
@@ -282,7 +282,7 @@ fn in_mem_partial_sort(
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
batch_size: usize,
tracking_metrics: MemTrackingMetrics,
mut tracking_metrics: MemTrackingMetrics,
_fetch: Option<usize>,
) -> Result<SendableRecordBatchStream> {
if buffered_batches.len() < 2 {
@@ -299,21 +299,17 @@ fn in_mem_partial_sort(
let streams = buffered_batches
.drain(..)
.map(|batch| {
let s = RecordBatchStreamAdapter::new(
Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(futures::future::ready(Ok(batch))),
);
SortedStream::new(Box::pin(s), batch_size)
)) as _
})
.collect();

Ok(Box::pin(SortPreservingMergeStream::new_from_streams(
streams,
schema,
expressions,
tracking_metrics,
batch_size,
)?))
// TODO: More accurate, dynamic memory accounting (#5885)
tracking_metrics.init_mem_used(batch_size);

streaming_merge(streams, schema, expressions, tracking_metrics, batch_size)
}

async fn spill_partial_sorted_stream(