diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index a3ca29f05de4..ceea83d952e0 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1211,6 +1211,7 @@ impl FunctionRegistry for ExecutionContextState { #[cfg(test)] mod tests { use super::*; + use crate::execution::context::QueryPlanner; use crate::from_slice::FromSlice; use crate::logical_plan::plan::Projection; use crate::logical_plan::TableScan; diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index b199e638c0b9..cabb13a55ed9 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -20,6 +20,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::metrics::BaselineMetrics; use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; use arrow::compute::concat; use arrow::datatypes::{Schema, SchemaRef}; @@ -41,15 +42,21 @@ pub struct SizedRecordBatchStream { schema: SchemaRef, batches: Vec>, index: usize, + baseline_metrics: BaselineMetrics, } impl SizedRecordBatchStream { /// Create a new RecordBatchIterator - pub fn new(schema: SchemaRef, batches: Vec>) -> Self { + pub fn new( + schema: SchemaRef, + batches: Vec>, + baseline_metrics: BaselineMetrics, + ) -> Self { SizedRecordBatchStream { schema, index: 0, batches, + baseline_metrics, } } } @@ -61,12 +68,13 @@ impl Stream for SizedRecordBatchStream { mut self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { - Poll::Ready(if self.index < self.batches.len() { + let poll = Poll::Ready(if self.index < self.batches.len() { self.index += 1; Some(Ok(self.batches[self.index - 1].as_ref().clone())) } else { None - }) + }); + self.baseline_metrics.record_poll(poll) } } diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index df3dc98f196d..f827dc32eca4 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -32,6 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc use super::SendableRecordBatchStream; use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use async_trait::async_trait; /// Explain execution plan operator. This operator contains the string @@ -146,9 +147,13 @@ impl ExecutionPlan for ExplainExec { ], )?; + let metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&metrics, partition); + Ok(Box::pin(SizedRecordBatchStream::new( self.schema.clone(), vec![Arc::new(record_batch)], + baseline_metrics, ))) } diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs b/datafusion/src/physical_plan/sorts/external_sort.rs deleted file mode 100644 index 6c60aac398b4..000000000000 --- a/datafusion/src/physical_plan/sorts/external_sort.rs +++ /dev/null @@ -1,657 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines the External-Sort plan - -use crate::error::{DataFusionError, Result}; -use crate::execution::memory_manager::{ - ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, -}; -use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; -use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, -}; -use crate::physical_plan::sorts::in_mem_sort::InMemSortStream; -use crate::physical_plan::sorts::sort::sort_batch; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; -use crate::physical_plan::sorts::SortedStream; -use crate::physical_plan::stream::RecordBatchReceiverStream; -use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - SendableRecordBatchStream, Statistics, -}; -use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; -use arrow::ipc::reader::FileReader; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use futures::lock::Mutex; -use futures::StreamExt; -use log::{error, info}; -use std::any::Any; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::fs::File; -use std::io::BufReader; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; -use tokio::task; - -/// Sort arbitrary size of data to get an total order (may spill several times during sorting based on free memory available). -/// -/// The basic architecture of the algorithm: -/// -/// let spills = vec![]; -/// let in_mem_batches = vec![]; -/// while (input.has_next()) { -/// let batch = input.next(); -/// // no enough memory available, spill first. -/// if exec_memory_available < size_of(batch) { -/// let ordered_stream = in_mem_heap_sort(in_mem_batches.drain(..)); -/// let tmp_file = spill_write(ordered_stream); -/// spills.push(tmp_file); -/// } -/// // sort the batch while it's probably still in cache and buffer it. -/// let sorted = sort_by_key(batch); -/// in_mem_batches.push(sorted); -/// } -/// -/// let partial_ordered_streams = vec![]; -/// let in_mem_stream = in_mem_heap_sort(in_mem_batches.drain(..)); -/// partial_ordered_streams.push(in_mem_stream); -/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream)); -/// let result = sort_preserving_merge(partial_ordered_streams); -struct ExternalSorter { - id: MemoryConsumerId, - schema: SchemaRef, - in_mem_batches: Mutex>, - spills: Mutex>, - /// Sort expressions - expr: Vec, - runtime: Arc, - metrics: ExecutionPlanMetricsSet, - used: AtomicUsize, - spilled_bytes: AtomicUsize, - spilled_count: AtomicUsize, -} - -impl ExternalSorter { - pub fn new( - partition_id: usize, - schema: SchemaRef, - expr: Vec, - runtime: Arc, - ) -> Self { - Self { - id: MemoryConsumerId::new(partition_id), - schema, - in_mem_batches: Mutex::new(vec![]), - spills: Mutex::new(vec![]), - expr, - runtime, - metrics: ExecutionPlanMetricsSet::new(), - used: AtomicUsize::new(0), - spilled_bytes: AtomicUsize::new(0), - spilled_count: AtomicUsize::new(0), - } - } - - async fn insert_batch(&self, input: RecordBatch) -> Result<()> { - let size = batch_byte_size(&input); - self.try_grow(size).await?; - self.used.fetch_add(size, Ordering::SeqCst); - // sort each batch as it's inserted, more probably to be cache-resident - let sorted_batch = sort_batch(input, self.schema.clone(), &*self.expr)?; - let mut in_mem_batches = self.in_mem_batches.lock().await; - in_mem_batches.push(sorted_batch); - Ok(()) - } - - /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - async fn sort(&self) -> Result { - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let mut streams: Vec = vec![]; - let in_mem_stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - self.runtime.batch_size(), - baseline_metrics, - ) - .await?; - streams.push(SortedStream::new(in_mem_stream, self.used())); - - let mut spills = self.spills.lock().await; - - for spill in spills.drain(..) { - let stream = read_spill_as_stream(spill, self.schema.clone()).await?; - streams.push(SortedStream::new(stream, 0)); - } - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - - Ok(Box::pin( - SortPreservingMergeStream::new_from_stream( - streams, - self.schema.clone(), - &self.expr, - baseline_metrics, - partition, - self.runtime.clone(), - ) - .await, - )) - } - - fn used(&self) -> usize { - self.used.load(Ordering::SeqCst) - } - - fn spilled_bytes(&self) -> usize { - self.spilled_bytes.load(Ordering::SeqCst) - } - - fn spilled_count(&self) -> usize { - self.spilled_count.load(Ordering::SeqCst) - } -} - -impl Debug for ExternalSorter { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter") - .field("id", &self.id()) - .field("memory_used", &self.used()) - .field("spilled_bytes", &self.spilled_bytes()) - .field("spilled_count", &self.spilled_count()) - .finish() - } -} - -#[async_trait] -impl MemoryConsumer for ExternalSorter { - fn name(&self) -> String { - "ExternalSorter".to_owned() - } - - fn id(&self) -> &MemoryConsumerId { - &self.id - } - - fn memory_manager(&self) -> Arc { - self.runtime.memory_manager.clone() - } - - fn type_(&self) -> &ConsumerType { - &ConsumerType::Requesting - } - - async fn spill(&self) -> Result { - info!( - "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", - self.name(), - self.id(), - self.used(), - self.spilled_count() - ); - - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - // we could always get a chance to free some memory as long as we are holding some - if in_mem_batches.len() == 0 { - return Ok(0); - } - - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - - let path = self.runtime.disk_manager.create_tmp_file()?; - let stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &*self.expr, - self.runtime.batch_size(), - baseline_metrics, - ) - .await; - - let total_size = - spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone()) - .await?; - - let mut spills = self.spills.lock().await; - let used = self.used.swap(0, Ordering::SeqCst); - self.spilled_count.fetch_add(1, Ordering::SeqCst); - self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst); - spills.push(path); - Ok(used) - } - - fn mem_used(&self) -> usize { - self.used.load(Ordering::SeqCst) - } -} - -/// consume the `sorted_bathes` and do in_mem_sort -async fn in_mem_partial_sort( - sorted_bathes: &mut Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - target_batch_size: usize, - baseline_metrics: BaselineMetrics, -) -> Result { - if sorted_bathes.len() == 1 { - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(sorted_bathes.pop().unwrap())], - ))) - } else { - let new = sorted_bathes.drain(..).collect(); - assert_eq!(sorted_bathes.len(), 0); - Ok(Box::pin(InMemSortStream::new( - new, - schema, - expressions, - target_batch_size, - baseline_metrics, - )?)) - } -} - -async fn spill_partial_sorted_stream( - in_mem_stream: &mut SendableRecordBatchStream, - path: String, - schema: SchemaRef, -) -> Result { - let (sender, receiver) = tokio::sync::mpsc::channel(2); - while let Some(item) = in_mem_stream.next().await { - sender.send(Some(item)).await.ok(); - } - sender.send(None).await.ok(); - let path_clone = path.clone(); - let res = - task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)).await; - match res { - Ok(r) => r, - Err(e) => Err(DataFusionError::Execution(format!( - "Error occurred while spilling {}", - e - ))), - } -} - -async fn read_spill_as_stream( - path: String, - schema: SchemaRef, -) -> Result { - let (sender, receiver): ( - TKSender>, - TKReceiver>, - ) = tokio::sync::mpsc::channel(2); - let path_clone = path.clone(); - let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_spill(sender, path_clone) { - error!("Failure while reading spill file: {}. Error: {}", path, e); - } - }); - Ok(RecordBatchReceiverStream::create( - &schema, - receiver, - join_handle, - )) -} - -fn write_sorted( - mut receiver: TKReceiver>>, - path: String, - schema: SchemaRef, -) -> Result { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - while let Some(Some(batch)) = receiver.blocking_recv() { - writer.write(&batch?)?; - } - writer.finish()?; - info!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, writer.num_rows, writer.num_bytes - ); - Ok(writer.num_bytes as usize) -} - -fn read_spill(sender: TKSender>, path: String) -> Result<()> { - let file = BufReader::new(File::open(&path)?); - let reader = FileReader::try_new(file)?; - for batch in reader { - sender - .blocking_send(batch) - .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; - } - Ok(()) -} - -/// External Sort execution plan -#[derive(Debug)] -pub struct ExternalSortExec { - /// Input schema - input: Arc, - /// Sort expressions - expr: Vec, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, - /// Preserve partitions of input plan - preserve_partitioning: bool, -} - -impl ExternalSortExec { - /// Create a new sort execution plan - pub fn try_new( - expr: Vec, - input: Arc, - ) -> Result { - Ok(Self::new_with_partitioning(expr, input, false)) - } - - /// Create a new sort execution plan with the option to preserve - /// the partitioning of the input plan - pub fn new_with_partitioning( - expr: Vec, - input: Arc, - preserve_partitioning: bool, - ) -> Self { - Self { - expr, - input, - metrics: ExecutionPlanMetricsSet::new(), - preserve_partitioning, - } - } - - /// Input schema - pub fn input(&self) -> &Arc { - &self.input - } - - /// Sort expressions - pub fn expr(&self) -> &[PhysicalSortExpr] { - &self.expr - } -} - -#[async_trait] -impl ExecutionPlan for ExternalSortExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.input.schema() - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - if self.preserve_partitioning { - self.input.output_partitioning() - } else { - Partitioning::UnknownPartitioning(1) - } - } - - fn required_child_distribution(&self) -> Distribution { - if self.preserve_partitioning { - Distribution::UnspecifiedDistribution - } else { - Distribution::SinglePartition - } - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - fn with_new_children( - &self, - children: Vec>, - ) -> Result> { - match children.len() { - 1 => Ok(Arc::new(ExternalSortExec::try_new( - self.expr.clone(), - children[0].clone(), - )?)), - _ => Err(DataFusionError::Internal( - "SortExec wrong number of children".to_string(), - )), - } - } - - async fn execute( - &self, - partition: usize, - runtime: Arc, - ) -> Result { - if !self.preserve_partitioning { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "SortExec invalid partition {}", - partition - ))); - } - - // sort needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "SortExec requires a single input partition".to_owned(), - )); - } - } - - let _baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let input = self.input.execute(partition, runtime.clone()).await?; - - external_sort(input, partition, self.expr.clone(), runtime).await - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); - write!(f, "SortExec: [{}]", expr.join(",")) - } - } - } - - fn statistics(&self) -> Statistics { - self.input.statistics() - } -} - -async fn external_sort( - mut input: SendableRecordBatchStream, - partition_id: usize, - expr: Vec, - runtime: Arc, -) -> Result { - let schema = input.schema(); - let sorter = Arc::new(ExternalSorter::new( - partition_id, - schema.clone(), - expr, - runtime.clone(), - )); - runtime.register_consumer(&(sorter.clone() as Arc)); - - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch).await?; - } - - let result = sorter.sort().await; - runtime.drop_consumer(sorter.id()); - result -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::datasource::object_store::local::LocalFileSystem; - use crate::execution::runtime_env::RuntimeConfig; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use crate::physical_plan::expressions::col; - use crate::physical_plan::{ - collect, - file_format::{CsvExec, FileScanConfig}, - }; - use crate::test; - use crate::test_util; - use arrow::array::*; - use arrow::compute::SortOptions; - use arrow::datatypes::*; - - async fn sort_with_runtime(runtime: Arc) -> Result> { - let schema = test_util::aggr_test_schema(); - let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let sort_exec = Arc::new(ExternalSortExec::try_new( - vec![ - // c1 string column - PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }, - // c2 uin32 column - PhysicalSortExpr { - expr: col("c2", &schema)?, - options: SortOptions::default(), - }, - // c7 uin8 column - PhysicalSortExpr { - expr: col("c7", &schema)?, - options: SortOptions::default(), - }, - ], - Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), - )?); - - collect(sort_exec, runtime).await - } - - #[tokio::test] - async fn test_in_mem_sort() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_sort_spill() -> Result<()> { - let config = RuntimeConfig::new() - .with_memory_fraction(1.0) - // trigger spill there will be 4 batches with 5.5KB for each - .with_max_execution_memory(12288); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_multi_output_batch() -> Result<()> { - let config = RuntimeConfig::new().with_batch_size(26); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 4); - - let columns_b1 = result[0].columns(); - let columns_b3 = result[3].columns(); - - let c1 = as_string_array(&columns_b1[0]); - let c13 = as_string_array(&columns_b3[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c13.value(c13.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns_b1[1]); - let c23 = as_primitive_array::(&columns_b3[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c23.value(c23.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns_b1[6]); - let c73 = as_primitive_array::(&columns_b3[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c73.value(c73.len() - 1), 254,); - - Ok(()) - } -} diff --git a/datafusion/src/physical_plan/sorts/in_mem_sort.rs b/datafusion/src/physical_plan/sorts/in_mem_sort.rs deleted file mode 100644 index 9e7753d42472..000000000000 --- a/datafusion/src/physical_plan/sorts/in_mem_sort.rs +++ /dev/null @@ -1,241 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::BinaryHeap; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use arrow::{ - array::{make_array as make_arrow_array, MutableArrayData}, - compute::SortOptions, - datatypes::SchemaRef, - error::{ArrowError, Result as ArrowResult}, - record_batch::RecordBatch, -}; -use futures::Stream; - -use crate::error::Result; -use crate::physical_plan::metrics::BaselineMetrics; -use crate::physical_plan::sorts::{RowIndex, SortKeyCursor}; -use crate::physical_plan::{ - expressions::PhysicalSortExpr, PhysicalExpr, RecordBatchStream, -}; - -/// Merge buffered, self-sorted record batches to get an order. -/// -/// Internally, it uses MinHeap to reduce extra memory consumption -/// by not concatenating all batches into one and sorting it as done by `SortExec`. -pub(crate) struct InMemSortStream { - /// The schema of the RecordBatches yielded by this stream - schema: SchemaRef, - /// Self sorted batches to be merged together - batches: Vec>, - /// The accumulated row indexes for the next record batch - in_progress: Vec, - /// The desired RecordBatch size to yield - target_batch_size: usize, - /// used to record execution metrics - baseline_metrics: BaselineMetrics, - /// If the stream has encountered an error - aborted: bool, - /// min heap for record comparison - min_heap: BinaryHeap, -} - -impl InMemSortStream { - pub(crate) fn new( - sorted_batches: Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - target_batch_size: usize, - baseline_metrics: BaselineMetrics, - ) -> Result { - let len = sorted_batches.len(); - let mut cursors = Vec::with_capacity(len); - let mut min_heap = BinaryHeap::with_capacity(len); - - let column_expressions: Vec> = - expressions.iter().map(|x| x.expr.clone()).collect(); - - // The sort options for each expression - let sort_options: Arc> = - Arc::new(expressions.iter().map(|x| x.options).collect()); - - sorted_batches - .into_iter() - .enumerate() - .try_for_each(|(idx, batch)| { - let batch = Arc::new(batch); - let cursor = match SortKeyCursor::new( - idx, - batch.clone(), - &column_expressions, - sort_options.clone(), - ) { - Ok(cursor) => cursor, - Err(e) => return Err(e), - }; - min_heap.push(cursor); - cursors.insert(idx, batch); - Ok(()) - })?; - - Ok(Self { - schema, - batches: cursors, - target_batch_size, - baseline_metrics, - aborted: false, - in_progress: vec![], - min_heap, - }) - } - - /// Returns the index of the next batch to pull a row from, or None - /// if all cursors for all batch are exhausted - fn next_cursor(&mut self) -> Result> { - match self.min_heap.pop() { - None => Ok(None), - Some(cursor) => Ok(Some(cursor)), - } - } - - /// Drains the in_progress row indexes, and builds a new RecordBatch from them - /// - /// Will then drop any cursors for which all rows have been yielded to the output - fn build_record_batch(&mut self) -> ArrowResult { - let columns = self - .schema - .fields() - .iter() - .enumerate() - .map(|(column_idx, field)| { - let arrays = self - .batches - .iter() - .map(|batch| batch.column(column_idx).data()) - .collect(); - - let mut array_data = MutableArrayData::new( - arrays, - field.is_nullable(), - self.in_progress.len(), - ); - - if self.in_progress.is_empty() { - return make_arrow_array(array_data.freeze()); - } - - let first = &self.in_progress[0]; - let mut buffer_idx = first.stream_idx; - let mut start_row_idx = first.row_idx; - let mut end_row_idx = start_row_idx + 1; - - for row_index in self.in_progress.iter().skip(1) { - let next_buffer_idx = row_index.stream_idx; - - if next_buffer_idx == buffer_idx && row_index.row_idx == end_row_idx { - // subsequent row in same batch - end_row_idx += 1; - continue; - } - - // emit current batch of rows for current buffer - array_data.extend(buffer_idx, start_row_idx, end_row_idx); - - // start new batch of rows - buffer_idx = next_buffer_idx; - start_row_idx = row_index.row_idx; - end_row_idx = start_row_idx + 1; - } - - // emit final batch of rows - array_data.extend(buffer_idx, start_row_idx, end_row_idx); - make_arrow_array(array_data.freeze()) - }) - .collect(); - - self.in_progress.clear(); - RecordBatch::try_new(self.schema.clone(), columns) - } - - #[inline] - fn poll_next_inner( - self: &mut Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { - if self.aborted { - return Poll::Ready(None); - } - - loop { - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); - let _timer = elapsed_compute.timer(); - - match self.next_cursor() { - Ok(Some(mut cursor)) => { - let batch_idx = cursor.batch_idx; - let row_idx = cursor.advance(); - - // insert the cursor back to min_heap if the record batch is not exhausted - if !cursor.is_finished() { - self.min_heap.push(cursor); - } - - self.in_progress.push(RowIndex { - stream_idx: batch_idx, - cursor_idx: 0, - row_idx, - }); - } - Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None), - Ok(None) => return Poll::Ready(Some(self.build_record_batch())), - Err(e) => { - self.aborted = true; - return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new( - e, - ))))); - } - }; - - if self.in_progress.len() == self.target_batch_size { - return Poll::Ready(Some(self.build_record_batch())); - } - } - } -} - -impl Stream for InMemSortStream { - type Item = ArrowResult; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let poll = self.poll_next_inner(cx); - self.baseline_metrics.record_poll(poll) - } -} - -impl RecordBatchStream for InMemSortStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} diff --git a/datafusion/src/physical_plan/sorts/mod.rs b/datafusion/src/physical_plan/sorts/mod.rs index 3dda13b1a178..1bb880f855ac 100644 --- a/datafusion/src/physical_plan/sorts/mod.rs +++ b/datafusion/src/physical_plan/sorts/mod.rs @@ -32,11 +32,10 @@ use std::borrow::BorrowMut; use std::cmp::Ordering; use std::fmt::{Debug, Formatter}; use std::pin::Pin; +use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; -pub mod external_sort; -mod in_mem_sort; pub mod sort; pub mod sort_preserving_merge; @@ -50,8 +49,9 @@ pub mod sort_preserving_merge; /// by this row cursor, with that of another `SortKeyCursor`. A cursor stores /// a row comparator for each other cursor that it is compared to. struct SortKeyCursor { - columns: Vec, - cur_row: usize, + stream_idx: usize, + sort_columns: Vec, + cur_row: AtomicUsize, num_rows: usize, // An index uniquely identifying the record batch scanned by this cursor. @@ -68,8 +68,8 @@ struct SortKeyCursor { impl<'a> std::fmt::Debug for SortKeyCursor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("SortKeyCursor") - .field("columns", &self.columns) - .field("cur_row", &self.cur_row) + .field("sort_columns", &self.sort_columns) + .field("cur_row", &self.cur_row()) .field("num_rows", &self.num_rows) .field("batch_idx", &self.batch_idx) .field("batch", &self.batch) @@ -80,19 +80,21 @@ impl<'a> std::fmt::Debug for SortKeyCursor { impl SortKeyCursor { fn new( + stream_idx: usize, batch_idx: usize, batch: Arc, sort_key: &[Arc], sort_options: Arc>, ) -> error::Result { - let columns = sort_key + let sort_columns = sort_key .iter() .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows()))) .collect::>()?; Ok(Self { - cur_row: 0, + stream_idx, + cur_row: AtomicUsize::new(0), num_rows: batch.num_rows(), - columns, + sort_columns, batch, batch_idx, batch_comparators: RwLock::new(HashMap::new()), @@ -101,38 +103,41 @@ impl SortKeyCursor { } fn is_finished(&self) -> bool { - self.num_rows == self.cur_row + self.num_rows == self.cur_row() } - fn advance(&mut self) -> usize { + fn advance(&self) -> usize { assert!(!self.is_finished()); - let t = self.cur_row; - self.cur_row += 1; - t + self.cur_row + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + fn cur_row(&self) -> usize { + self.cur_row.load(std::sync::atomic::Ordering::SeqCst) } /// Compares the sort key pointed to by this instance's row cursor with that of another fn compare(&self, other: &SortKeyCursor) -> error::Result { - if self.columns.len() != other.columns.len() { + if self.sort_columns.len() != other.sort_columns.len() { return Err(DataFusionError::Internal(format!( "SortKeyCursors had inconsistent column counts: {} vs {}", - self.columns.len(), - other.columns.len() + self.sort_columns.len(), + other.sort_columns.len() ))); } - if self.columns.len() != self.sort_options.len() { + if self.sort_columns.len() != self.sort_options.len() { return Err(DataFusionError::Internal(format!( "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}", - self.columns.len(), + self.sort_columns.len(), self.sort_options.len() ))); } let zipped: Vec<((&ArrayRef, &ArrayRef), &SortOptions)> = self - .columns + .sort_columns .iter() - .zip(other.columns.iter()) + .zip(other.sort_columns.iter()) .zip(self.sort_options.iter()) .collect::>(); @@ -146,7 +151,7 @@ impl SortKeyCursor { })?; for (i, ((l, r), sort_options)) in zipped.iter().enumerate() { - match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) { + match (l.is_valid(self.cur_row()), r.is_valid(other.cur_row())) { (false, true) if sort_options.nulls_first => return Ok(Ordering::Less), (false, true) => return Ok(Ordering::Greater), (true, false) if sort_options.nulls_first => { @@ -154,7 +159,7 @@ impl SortKeyCursor { } (true, false) => return Ok(Ordering::Less), (false, false) => {} - (true, true) => match cmp[i](self.cur_row, other.cur_row) { + (true, true) => match cmp[i](self.cur_row(), other.cur_row()) { Ordering::Equal => {} o if sort_options.descending => return Ok(o.reverse()), o => return Ok(o), @@ -179,7 +184,7 @@ impl SortKeyCursor { let cmp = map .borrow_mut() .entry(other.batch_idx) - .or_insert_with(|| Vec::with_capacity(other.columns.len())); + .or_insert_with(|| Vec::with_capacity(other.sort_columns.len())); for (i, ((l, r), _)) in zipped.iter().enumerate() { if i >= cmp.len() { @@ -193,7 +198,7 @@ impl SortKeyCursor { } impl Ord for SortKeyCursor { - /// Needed by min-heap comparison in `in_mem_sort` and reverse the order at the same time. + /// Needed by min-heap comparison and reverse the order at the same time. fn cmp(&self, other: &Self) -> Ordering { other.compare(self).unwrap() } @@ -219,8 +224,7 @@ impl PartialOrd for SortKeyCursor { struct RowIndex { /// The index of the stream stream_idx: usize, - /// For sort_preserving_merge, it's the index of the cursor within the stream's VecDequeue. - /// For in_mem_sort which have only one batch for each stream, cursor_idx always 0 + /// The index of the cursor within the stream's VecDequeue. cursor_idx: usize, /// The row index row_idx: usize, @@ -251,10 +255,9 @@ enum StreamWrapper { impl StreamWrapper { fn mem_used(&self) -> usize { - if let StreamWrapper::Stream(Some(s)) = &self { - s.mem_used - } else { - 0 + match &self { + StreamWrapper::Stream(Some(s)) => s.mem_used, + _ => 0, } } } diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index a210b935d569..c3a138e373c3 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -15,47 +15,433 @@ // specific language governing permissions and limitations // under the License. -//! Defines the SORT plan +//! Sort that deals with an arbitrary size of the input. +//! It will do in-memory sorting if it has enough memory budget +//! but spills to disk if needed. use crate::error::{DataFusionError, Result}; +use crate::execution::memory_manager::{ + ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, +}; use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::AbortOnDropSingle; +use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time, }; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; +use crate::physical_plan::sorts::SortedStream; +use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + common, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, + Partitioning, SendableRecordBatchStream, Statistics, }; -use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, Statistics}; +use arrow::array::ArrayRef; pub use arrow::compute::SortOptions; use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; +use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; -use arrow::{array::ArrayRef, error::ArrowError}; use async_trait::async_trait; -use futures::stream::Stream; -use futures::Future; -use pin_project_lite::pin_project; +use futures::lock::Mutex; +use futures::StreamExt; +use log::{error, info}; use std::any::Any; -use std::pin::Pin; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::File; +use std::io::BufReader; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::time::Duration; +use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; +use tokio::task; + +/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). +/// +/// The basic architecture of the algorithm: +/// 1. get a non-empty new batch from input +/// 2. check with the memory manager if we could buffer the batch in memory +/// 2.1 if memory sufficient, then buffer batch in memory, go to 1. +/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file. +/// buffer the batch in memory, go to 1. +/// 3. when input is exhausted, merge all in memory batches and spills to get a total order. +struct ExternalSorter { + id: MemoryConsumerId, + schema: SchemaRef, + in_mem_batches: Mutex>, + spills: Mutex>, + /// Sort expressions + expr: Vec, + runtime: Arc, + metrics: AggregatedMetricsSet, + used: AtomicUsize, + spilled_bytes: AtomicUsize, + spilled_count: AtomicUsize, +} + +impl ExternalSorter { + pub fn new( + partition_id: usize, + schema: SchemaRef, + expr: Vec, + metrics: AggregatedMetricsSet, + runtime: Arc, + ) -> Self { + Self { + id: MemoryConsumerId::new(partition_id), + schema, + in_mem_batches: Mutex::new(vec![]), + spills: Mutex::new(vec![]), + expr, + runtime, + metrics, + used: AtomicUsize::new(0), + spilled_bytes: AtomicUsize::new(0), + spilled_count: AtomicUsize::new(0), + } + } + + async fn insert_batch(&self, input: RecordBatch) -> Result<()> { + if input.num_rows() > 0 { + let size = batch_byte_size(&input); + self.try_grow(size).await?; + self.used.fetch_add(size, Ordering::SeqCst); + let mut in_mem_batches = self.in_mem_batches.lock().await; + in_mem_batches.push(input); + } + Ok(()) + } + + async fn spilled_before(&self) -> bool { + let spills = self.spills.lock().await; + !spills.is_empty() + } + + /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. + async fn sort(&self) -> Result { + let partition = self.partition_id(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + + if self.spilled_before().await { + let baseline_metrics = self.metrics.new_intermediate_baseline(partition); + let mut streams: Vec = vec![]; + if in_mem_batches.len() > 0 { + let in_mem_stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + baseline_metrics, + )?; + streams.push(SortedStream::new(in_mem_stream, self.used())); + } -/// Sort execution plan + let mut spills = self.spills.lock().await; + + for spill in spills.drain(..) { + let stream = read_spill_as_stream(spill, self.schema.clone())?; + streams.push(SortedStream::new(stream, 0)); + } + let baseline_metrics = self.metrics.new_final_baseline(partition); + Ok(Box::pin(SortPreservingMergeStream::new_from_streams( + streams, + self.schema.clone(), + &self.expr, + baseline_metrics, + partition, + self.runtime.clone(), + ))) + } else if in_mem_batches.len() > 0 { + let baseline_metrics = self.metrics.new_final_baseline(partition); + in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + baseline_metrics, + ) + } else { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) + } + } + + fn used(&self) -> usize { + self.used.load(Ordering::SeqCst) + } + + fn spilled_bytes(&self) -> usize { + self.spilled_bytes.load(Ordering::SeqCst) + } + + fn spilled_count(&self) -> usize { + self.spilled_count.load(Ordering::SeqCst) + } +} + +impl Debug for ExternalSorter { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("ExternalSorter") + .field("id", &self.id()) + .field("memory_used", &self.used()) + .field("spilled_bytes", &self.spilled_bytes()) + .field("spilled_count", &self.spilled_count()) + .finish() + } +} + +#[async_trait] +impl MemoryConsumer for ExternalSorter { + fn name(&self) -> String { + "ExternalSorter".to_owned() + } + + fn id(&self) -> &MemoryConsumerId { + &self.id + } + + fn memory_manager(&self) -> Arc { + self.runtime.memory_manager.clone() + } + + fn type_(&self) -> &ConsumerType { + &ConsumerType::Requesting + } + + async fn spill(&self) -> Result { + info!( + "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", + self.name(), + self.id(), + self.used(), + self.spilled_count() + ); + + let partition = self.partition_id(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + // we could always get a chance to free some memory as long as we are holding some + if in_mem_batches.len() == 0 { + return Ok(0); + } + + let baseline_metrics = self.metrics.new_intermediate_baseline(partition); + + let path = self.runtime.disk_manager.create_tmp_file()?; + let stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &*self.expr, + baseline_metrics, + ); + + let total_size = + spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone()) + .await?; + + let mut spills = self.spills.lock().await; + let used = self.used.swap(0, Ordering::SeqCst); + self.spilled_count.fetch_add(1, Ordering::SeqCst); + self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst); + spills.push(path); + Ok(used) + } + + fn mem_used(&self) -> usize { + self.used.load(Ordering::SeqCst) + } +} + +/// consume the non-empty `sorted_bathes` and do in_mem_sort +fn in_mem_partial_sort( + buffered_batches: &mut Vec, + schema: SchemaRef, + expressions: &[PhysicalSortExpr], + baseline_metrics: BaselineMetrics, +) -> Result { + assert_ne!(buffered_batches.len(), 0); + + let result = { + // NB timer records time taken on drop, so there are no + // calls to `timer.done()` below. + let _timer = baseline_metrics.elapsed_compute().timer(); + + let pre_sort = if buffered_batches.len() == 1 { + buffered_batches.pop() + } else { + let batches = buffered_batches.drain(..).collect::>(); + // combine all record batches into one for each column + common::combine_batches(&batches, schema.clone())? + }; + + pre_sort + .map(|batch| sort_batch(batch, schema.clone(), expressions)) + .transpose()? + }; + + Ok(Box::pin(SizedRecordBatchStream::new( + schema, + vec![Arc::new(result.unwrap())], + baseline_metrics, + ))) +} + +async fn spill_partial_sorted_stream( + in_mem_stream: &mut SendableRecordBatchStream, + path: String, + schema: SchemaRef, +) -> Result { + let (sender, receiver) = tokio::sync::mpsc::channel(2); + let path_clone = path.clone(); + let handle = task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)); + while let Some(item) = in_mem_stream.next().await { + sender.send(item).await.ok(); + } + drop(sender); + match handle.await { + Ok(r) => r, + Err(e) => Err(DataFusionError::Execution(format!( + "Error occurred while spilling {}", + e + ))), + } +} + +fn read_spill_as_stream( + path: String, + schema: SchemaRef, +) -> Result { + let (sender, receiver): ( + TKSender>, + TKReceiver>, + ) = tokio::sync::mpsc::channel(2); + let path_clone = path.clone(); + let join_handle = task::spawn_blocking(move || { + if let Err(e) = read_spill(sender, path_clone) { + error!("Failure while reading spill file: {}. Error: {}", path, e); + } + }); + Ok(RecordBatchReceiverStream::create( + &schema, + receiver, + join_handle, + )) +} + +fn write_sorted( + mut receiver: TKReceiver>, + path: String, + schema: SchemaRef, +) -> Result { + let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + while let Some(batch) = receiver.blocking_recv() { + writer.write(&batch?)?; + } + writer.finish()?; + info!( + "Spilled {} batches of total {} rows to disk, memory released {}", + writer.num_batches, writer.num_rows, writer.num_bytes + ); + Ok(writer.num_bytes as usize) +} + +fn read_spill(sender: TKSender>, path: String) -> Result<()> { + let file = BufReader::new(File::open(&path)?); + let reader = FileReader::try_new(file)?; + for batch in reader { + sender + .blocking_send(batch) + .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; + } + Ok(()) +} + +/// External Sort execution plan #[derive(Debug)] pub struct SortExec { /// Input schema input: Arc, /// Sort expressions expr: Vec, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, + /// Containing all metrics set created during sort + all_metrics: AggregatedMetricsSet, /// Preserve partitions of input plan preserve_partitioning: bool, } +#[derive(Debug, Clone)] +/// Aggregates all metrics during a complex operation, which is composed of multiple stages and +/// each stage reports its statistics separately. +/// Give sort as an example, when the dataset is more significant than available memory, it will report +/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`. +/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation), +/// and which are intermediate metrics that we only account for elapsed_compute time. +struct AggregatedMetricsSet { + intermediate: Arc>>, + final_: Arc>>, +} + +impl AggregatedMetricsSet { + fn new() -> Self { + Self { + intermediate: Arc::new(std::sync::Mutex::new(vec![])), + final_: Arc::new(std::sync::Mutex::new(vec![])), + } + } + + fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics { + let ms = ExecutionPlanMetricsSet::new(); + let result = BaselineMetrics::new(&ms, partition); + self.intermediate.lock().unwrap().push(ms); + result + } + + fn new_final_baseline(&self, partition: usize) -> BaselineMetrics { + let ms = ExecutionPlanMetricsSet::new(); + let result = BaselineMetrics::new(&ms, partition); + self.final_.lock().unwrap().push(ms); + result + } + + /// We should accumulate all times from all stages' reports for the total time consumption. + fn merge_compute_time(&self, dest: &Time) { + let time1 = self + .intermediate + .lock() + .unwrap() + .iter() + .map(|es| { + es.clone_inner() + .elapsed_compute() + .map_or(0u64, |v| v as u64) + }) + .sum(); + let time2 = self + .final_ + .lock() + .unwrap() + .iter() + .map(|es| { + es.clone_inner() + .elapsed_compute() + .map_or(0u64, |v| v as u64) + }) + .sum(); + dest.add_duration(Duration::from_nanos(time1)); + dest.add_duration(Duration::from_nanos(time2)); + } + + /// We should only care about output from the final stage metrics. + fn merge_output_count(&self, dest: &Count) { + let count = self + .final_ + .lock() + .unwrap() + .iter() + .map(|es| es.clone_inner().output_rows().map_or(0, |v| v)) + .sum(); + dest.add(count); + } +} + impl SortExec { /// Create a new sort execution plan pub fn try_new( @@ -75,7 +461,7 @@ impl SortExec { Self { expr, input, - metrics: ExecutionPlanMetricsSet::new(), + all_metrics: AggregatedMetricsSet::new(), preserve_partitioning, } } @@ -93,7 +479,6 @@ impl SortExec { #[async_trait] impl ExecutionPlan for SortExec { - /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self } @@ -102,10 +487,6 @@ impl ExecutionPlan for SortExec { self.input.schema() } - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { if self.preserve_partitioning { @@ -123,6 +504,10 @@ impl ExecutionPlan for SortExec { } } + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + fn with_new_children( &self, children: Vec>, @@ -159,14 +544,25 @@ impl ExecutionPlan for SortExec { } } - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let input = self.input.execute(partition, runtime).await?; + let input = self.input.execute(partition, runtime.clone()).await?; - Ok(Box::pin(SortStream::new( + do_sort( input, + partition, self.expr.clone(), - baseline_metrics, - ))) + self.all_metrics.clone(), + runtime, + ) + .await + } + + fn metrics(&self) -> Option { + let metrics = ExecutionPlanMetricsSet::new(); + let baseline = BaselineMetrics::new(&metrics, 0); + self.all_metrics + .merge_compute_time(baseline.elapsed_compute()); + self.all_metrics.merge_output_count(baseline.output_rows()); + Some(metrics.clone_inner()) } fn fmt_as( @@ -182,16 +578,12 @@ impl ExecutionPlan for SortExec { } } - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - fn statistics(&self) -> Statistics { self.input.statistics() } } -pub(crate) fn sort_batch( +fn sort_batch( batch: RecordBatch, schema: SchemaRef, expr: &[PhysicalSortExpr], @@ -227,97 +619,38 @@ pub(crate) fn sort_batch( ) } -pin_project! { - /// stream for sort plan - struct SortStream { - #[pin] - output: futures::channel::oneshot::Receiver>>, - finished: bool, - schema: SchemaRef, - drop_helper: AbortOnDropSingle<()>, - } -} - -impl SortStream { - fn new( - input: SendableRecordBatchStream, - expr: Vec, - baseline_metrics: BaselineMetrics, - ) -> Self { - let (tx, rx) = futures::channel::oneshot::channel(); - let schema = input.schema(); - let join_handle = tokio::spawn(async move { - let schema = input.schema(); - let sorted_batch = common::collect(input) - .await - .map_err(DataFusionError::into_arrow_external_error) - .and_then(move |batches| { - let timer = baseline_metrics.elapsed_compute().timer(); - // combine all record batches into one for each column - let combined = common::combine_batches(&batches, schema.clone())?; - // sort combined record batch - let result = combined - .map(|batch| sort_batch(batch, schema, &expr)) - .transpose()? - .record_output(&baseline_metrics); - timer.done(); - Ok(result) - }); - - // failing here is OK, the receiver is gone and does not care about the result - tx.send(sorted_batch).ok(); - }); - - Self { - output: rx, - finished: false, - schema, - drop_helper: AbortOnDropSingle::new(join_handle), - } - } -} - -impl Stream for SortStream { - type Item = ArrowResult; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.finished { - return Poll::Ready(None); - } - - // is the output ready? - let this = self.project(); - let output_poll = this.output.poll(cx); - - match output_poll { - Poll::Ready(result) => { - *this.finished = true; - - // check for error in receiving channel and unwrap actual result - let result = match result { - Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving - Ok(result) => result.transpose(), - }; - - Poll::Ready(result) - } - Poll::Pending => Poll::Pending, - } +async fn do_sort( + mut input: SendableRecordBatchStream, + partition_id: usize, + expr: Vec, + metrics: AggregatedMetricsSet, + runtime: Arc, +) -> Result { + let schema = input.schema(); + let sorter = Arc::new(ExternalSorter::new( + partition_id, + schema.clone(), + expr, + metrics, + runtime.clone(), + )); + runtime.register_consumer(&(sorter.clone() as Arc)); + + while let Some(batch) = input.next().await { + let batch = batch?; + sorter.insert_batch(batch).await?; } -} -impl RecordBatchStream for SortStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } + let result = sorter.sort().await; + runtime.drop_consumer(sorter.id()); + result } #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap}; - use super::*; use crate::datasource::object_store::local::LocalFileSystem; + use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; @@ -325,17 +658,17 @@ mod tests { collect, file_format::{CsvExec, FileScanConfig}, }; + use crate::test; use crate::test::assert_is_pending; - use crate::test::exec::assert_strong_count_converges_to_zero; - use crate::test::{self, exec::BlockingExec}; + use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test_util; use arrow::array::*; + use arrow::compute::SortOptions; use arrow::datatypes::*; use futures::FutureExt; + use std::collections::{BTreeMap, HashMap}; - #[tokio::test] - async fn test_sort() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); + async fn sort_with_runtime(runtime: Arc) -> Result> { let schema = test_util::aggr_test_schema(); let partitions = 4; let (_, files) = @@ -376,7 +709,42 @@ mod tests { Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), )?); - let result: Vec = collect(sort_exec, runtime).await?; + collect(sort_exec, runtime).await + } + + #[tokio::test] + async fn test_in_mem_sort() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let result = sort_with_runtime(runtime).await?; + + assert_eq!(result.len(), 1); + + let columns = result[0].columns(); + + let c1 = as_string_array(&columns[0]); + assert_eq!(c1.value(0), "a"); + assert_eq!(c1.value(c1.len() - 1), "e"); + + let c2 = as_primitive_array::(&columns[1]); + assert_eq!(c2.value(0), 1); + assert_eq!(c2.value(c2.len() - 1), 5,); + + let c7 = as_primitive_array::(&columns[6]); + assert_eq!(c7.value(0), 15); + assert_eq!(c7.value(c7.len() - 1), 254,); + + Ok(()) + } + + #[tokio::test] + async fn test_sort_spill() -> Result<()> { + let config = RuntimeConfig::new() + .with_memory_fraction(1.0) + // trigger spill there will be 4 batches with 5.5KB for each + .with_max_execution_memory(12288); + let runtime = Arc::new(RuntimeEnv::new(config)?); + let result = sort_with_runtime(runtime).await?; + assert_eq!(result.len(), 1); let columns = result[0].columns(); diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index 9f12891ea013..189a9fb336d6 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -22,8 +22,7 @@ use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; use std::any::Any; -use std::cmp::Ordering; -use std::collections::VecDeque; +use std::collections::{BinaryHeap, VecDeque}; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -168,18 +167,15 @@ impl ExecutionPlan for SortPreservingMergeExec { }) .unzip(); - Ok(Box::pin( - SortPreservingMergeStream::new_from_receiver( - receivers, - AbortOnDropMany(join_handles), - self.schema(), - &self.expr, - baseline_metrics, - partition, - runtime.clone(), - ) - .await, - )) + Ok(Box::pin(SortPreservingMergeStream::new_from_receivers( + receivers, + AbortOnDropMany(join_handles), + self.schema(), + &self.expr, + baseline_metrics, + partition, + runtime, + ))) } } } @@ -284,7 +280,7 @@ pub(crate) struct SortPreservingMergeStream { /// /// Exhausted cursors will be popped off the front once all /// their rows have been yielded to the output - cursors: Vec>, + cursors: Vec>>, /// The accumulated row indexes for the next record batch in_progress: Vec, @@ -304,6 +300,9 @@ pub(crate) struct SortPreservingMergeStream { /// An index to uniquely identify the input stream batch next_batch_index: usize, + /// min heap for record comparison + min_heap: BinaryHeap>, + /// runtime runtime: Arc, } @@ -316,7 +315,7 @@ impl Drop for SortPreservingMergeStream { impl SortPreservingMergeStream { #[allow(clippy::too_many_arguments)] - pub(crate) async fn new_from_receiver( + pub(crate) fn new_from_receivers( receivers: Vec>>, _drop_helper: AbortOnDropMany<()>, schema: SchemaRef, @@ -325,16 +324,16 @@ impl SortPreservingMergeStream { partition: usize, runtime: Arc, ) -> Self { - let cursors = (0..receivers.len()) + let stream_count = receivers.len(); + let cursors = (0..stream_count) .into_iter() .map(|_| VecDeque::new()) .collect(); - let wrappers = receivers.into_iter().map(StreamWrapper::Receiver).collect(); let streams = Arc::new(MergingStreams::new(partition, wrappers, runtime.clone())); runtime.register_consumer(&(streams.clone() as Arc)); - Self { + SortPreservingMergeStream { schema, cursors, streams, @@ -345,11 +344,12 @@ impl SortPreservingMergeStream { aborted: false, in_progress: vec![], next_batch_index: 0, + min_heap: BinaryHeap::with_capacity(stream_count), runtime, } } - pub(crate) async fn new_from_stream( + pub(crate) fn new_from_streams( streams: Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], @@ -357,16 +357,15 @@ impl SortPreservingMergeStream { partition: usize, runtime: Arc, ) -> Self { - let cursors = (0..streams.len()) + let stream_count = streams.len(); + let cursors = (0..stream_count) .into_iter() .map(|_| VecDeque::new()) .collect(); - let wrappers = streams .into_iter() .map(|s| StreamWrapper::Stream(Some(s))) - .collect::>(); - + .collect(); let streams = Arc::new(MergingStreams::new(partition, wrappers, runtime.clone())); runtime.register_consumer(&(streams.clone() as Arc)); @@ -381,6 +380,7 @@ impl SortPreservingMergeStream { aborted: false, in_progress: vec![], next_batch_index: 0, + min_heap: BinaryHeap::with_capacity(stream_count), runtime, } } @@ -414,18 +414,24 @@ impl SortPreservingMergeStream { return Poll::Ready(Err(e)); } Some(Ok(batch)) => { - let cursor = match SortKeyCursor::new( - self.next_batch_index, // assign this batch an ID - Arc::new(batch), - &self.column_expressions, - self.sort_options.clone(), - ) { - Ok(cursor) => cursor, - Err(e) => { - return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e)))); - } - }; + let cursor = Arc::new( + match SortKeyCursor::new( + idx, + self.next_batch_index, // assign this batch an ID + Arc::new(batch), + &self.column_expressions, + self.sort_options.clone(), + ) { + Ok(cursor) => cursor, + Err(e) => { + return Poll::Ready(Err(ArrowError::ExternalError( + Box::new(e), + ))); + } + }, + ); self.next_batch_index += 1; + self.min_heap.push(cursor.clone()); self.cursors[idx].push_back(cursor) } } @@ -433,30 +439,6 @@ impl SortPreservingMergeStream { Poll::Ready(Ok(())) } - /// Returns the index of the next stream to pull a row from, or None - /// if all cursors for all streams are exhausted - fn next_stream_idx(&mut self) -> Result> { - let mut min_cursor: Option<(usize, &mut SortKeyCursor)> = None; - for (idx, candidate) in self.cursors.iter_mut().enumerate() { - if let Some(candidate) = candidate.back_mut() { - if candidate.is_finished() { - continue; - } - - match min_cursor { - None => min_cursor = Some((idx, candidate)), - Some((_, ref mut min)) => { - if min.compare(candidate)? == Ordering::Greater { - min_cursor = Some((idx, candidate)) - } - } - } - } - } - - Ok(min_cursor.map(|(idx, _)| idx)) - } - /// Drains the in_progress row indexes, and builds a new RecordBatch from them /// /// Will then drop any cursors for which all rows have been yielded to the output @@ -588,44 +570,44 @@ impl SortPreservingMergeStream { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); - let stream_idx = match self.next_stream_idx() { - Ok(Some(idx)) => idx, - Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None), - Ok(None) => return Poll::Ready(Some(self.build_record_batch())), - Err(e) => { - self.aborted = true; - return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new( - e, - ))))); - } - }; - - let cursors = &mut self.cursors[stream_idx]; - let cursor_idx = cursors.len() - 1; - let cursor = cursors.back_mut().unwrap(); - let row_idx = cursor.advance(); - let cursor_finished = cursor.is_finished(); - - self.in_progress.push(RowIndex { - stream_idx, - cursor_idx, - row_idx, - }); + match self.min_heap.pop() { + Some(cursor) => { + let stream_idx = cursor.stream_idx; + let cursor_idx = self.cursors[stream_idx].len() - 1; + let row_idx = cursor.advance(); + + let mut cursor_finished = false; + // insert the cursor back to min_heap if the record batch is not exhausted + if !cursor.is_finished() { + self.min_heap.push(cursor); + } else { + cursor_finished = true; + } - if self.in_progress.len() == self.runtime.batch_size() { - return Poll::Ready(Some(self.build_record_batch())); - } + self.in_progress.push(RowIndex { + stream_idx, + cursor_idx, + row_idx, + }); - // If removed the last row from the cursor, need to fetch a new record - // batch if possible, before looping round again - if cursor_finished { - match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) { - Ok(_) => {} - Err(e) => { - self.aborted = true; - return Poll::Ready(Some(Err(e))); + if self.in_progress.len() == self.runtime.batch_size() { + return Poll::Ready(Some(self.build_record_batch())); + } + + // If removed the last row from the cursor, need to fetch a new record + // batch if possible, before looping round again + if cursor_finished { + match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) { + Ok(_) => {} + Err(e) => { + self.aborted = true; + return Poll::Ready(Some(Err(e))); + } + } } } + None if self.in_progress.is_empty() => return Poll::Ready(None), + None => return Poll::Ready(Some(self.build_record_batch())), } } } @@ -1089,8 +1071,6 @@ mod tests { #[tokio::test] async fn test_partition_sort_streaming_input_output() { - let runtime = - Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(23)).unwrap()); let schema = test_util::aggr_test_schema(); let sort = vec![ @@ -1106,12 +1086,15 @@ mod tests { }, ]; + let runtime = Arc::new(RuntimeEnv::default()); let input = sorted_partitioned_input(sort.clone(), &[10, 5, 13], runtime.clone()).await; - let basic = basic_sort(input.clone(), sort.clone(), runtime.clone()).await; + let basic = basic_sort(input.clone(), sort.clone(), runtime).await; + let runtime_bs_23 = + Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(23)).unwrap()); let merge = Arc::new(SortPreservingMergeExec::new(sort, input)); - let merged = collect(merge, runtime.clone()).await.unwrap(); + let merged = collect(merge, runtime_bs_23).await.unwrap(); assert_eq!(merged.len(), 14); @@ -1242,7 +1225,7 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); let baseline_metrics = BaselineMetrics::new(&metrics, 0); - let merge_stream = SortPreservingMergeStream::new_from_receiver( + let merge_stream = SortPreservingMergeStream::new_from_receivers( receivers, // Use empty vector since we want to use the join handles ourselves AbortOnDropMany(vec![]), @@ -1251,8 +1234,7 @@ mod tests { baseline_metrics, 0, runtime.clone(), - ) - .await; + ); let mut merged = common::collect(Box::pin(merge_stream)).await.unwrap(); diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index 5e1452481952..5a4f90702ecb 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -25,6 +25,7 @@ use datafusion::execution::context::ExecutionContext; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr; use datafusion::physical_plan::common::SizedRecordBatchStream; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -81,12 +82,15 @@ impl ExecutionPlan for CustomPlan { async fn execute( &self, - _partition: usize, + partition: usize, _runtime: Arc, ) -> Result { + let metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&metrics, partition); Ok(Box::pin(SizedRecordBatchStream::new( self.schema(), self.batches.clone(), + baseline_metrics, ))) } diff --git a/datafusion/tests/sql/joins.rs b/datafusion/tests/sql/joins.rs index 1aa4eb6dfed2..85b59e6898c1 100644 --- a/datafusion/tests/sql/joins.rs +++ b/datafusion/tests/sql/joins.rs @@ -419,32 +419,32 @@ async fn cross_join_unbalanced() { // the order of the values is not determinisitic, so we need to sort to check the values let sql = - "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, t1_name"; + "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, t1_name, t2_name"; let actual = execute_to_batches(&mut ctx, sql).await; let expected = vec![ "+-------+---------+---------+", "| t1_id | t1_name | t2_name |", "+-------+---------+---------+", - "| 11 | a | z |", - "| 11 | a | y |", - "| 11 | a | x |", "| 11 | a | w |", - "| 22 | b | z |", - "| 22 | b | y |", - "| 22 | b | x |", + "| 11 | a | x |", + "| 11 | a | y |", + "| 11 | a | z |", "| 22 | b | w |", - "| 33 | c | z |", - "| 33 | c | y |", - "| 33 | c | x |", + "| 22 | b | x |", + "| 22 | b | y |", + "| 22 | b | z |", "| 33 | c | w |", - "| 44 | d | z |", - "| 44 | d | y |", - "| 44 | d | x |", + "| 33 | c | x |", + "| 33 | c | y |", + "| 33 | c | z |", "| 44 | d | w |", - "| 77 | e | z |", - "| 77 | e | y |", - "| 77 | e | x |", + "| 44 | d | x |", + "| 44 | d | y |", + "| 44 | d | z |", "| 77 | e | w |", + "| 77 | e | x |", + "| 77 | e | y |", + "| 77 | e | z |", "+-------+---------+---------+", ]; assert_batches_eq!(expected, &actual);