Skip to content

Commit

Permalink
Consolidate sort and external_sort (#1596)
Browse files Browse the repository at this point in the history
* Change SPMS to use heap sort, use SPMS instead of in-mem-sort as well

* Incorporate metrics, external_sort pass all sort tests

* Remove the original sort, substitute with external sort

* Fix different batch_size setting in SPMS test

* Change to use combine and sort for in memory N-way merge

* Resolve comments on async and doc

* Update sort to avoid deadlock during spilling

* Fix spill hanging
  • Loading branch information
yjshen authored Jan 21, 2022
1 parent 3c5a679 commit 7d819d1
Show file tree
Hide file tree
Showing 10 changed files with 641 additions and 1,168 deletions.
1 change: 1 addition & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
use crate::from_slice::FromSlice;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScan;
Expand Down
14 changes: 11 additions & 3 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
Expand All @@ -41,15 +42,21 @@ pub struct SizedRecordBatchStream {
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
index: usize,
baseline_metrics: BaselineMetrics,
}

impl SizedRecordBatchStream {
/// Create a new RecordBatchIterator
pub fn new(schema: SchemaRef, batches: Vec<Arc<RecordBatch>>) -> Self {
pub fn new(
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
baseline_metrics: BaselineMetrics,
) -> Self {
SizedRecordBatchStream {
schema,
index: 0,
batches,
baseline_metrics,
}
}
}
Expand All @@ -61,12 +68,13 @@ impl Stream for SizedRecordBatchStream {
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(if self.index < self.batches.len() {
let poll = Poll::Ready(if self.index < self.batches.len() {
self.index += 1;
Some(Ok(self.batches[self.index - 1].as_ref().clone()))
} else {
None
})
});
self.baseline_metrics.record_poll(poll)
}
}

Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc

use super::SendableRecordBatchStream;
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use async_trait::async_trait;

/// Explain execution plan operator. This operator contains the string
Expand Down Expand Up @@ -146,9 +147,13 @@ impl ExecutionPlan for ExplainExec {
],
)?;

let metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&metrics, partition);

Ok(Box::pin(SizedRecordBatchStream::new(
self.schema.clone(),
vec![Arc::new(record_batch)],
baseline_metrics,
)))
}

Expand Down
Loading

0 comments on commit 7d819d1

Please sign in to comment.