From 4e9424b4436b3ce74130b4bcc1ffb43c84e49503 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Mon, 26 Aug 2024 15:38:27 +0300 Subject: [PATCH 01/11] Sort fetch updates execution mode --- datafusion/physical-plan/src/sorts/sort.rs | 37 ++++++++++++----- .../src/windows/bounded_window_agg_exec.rs | 41 +++++++++---------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a81b09948cca..30b211e448b5 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -51,6 +51,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -737,9 +738,30 @@ impl SortExec { /// This can reduce the memory pressure required by the sort /// operation since rows that are not going to be included /// can be dropped. - pub fn with_fetch(mut self, fetch: Option) -> Self { - self.fetch = fetch; - self + pub fn with_fetch(&self, fetch: Option) -> Self { + let mut cache = self.cache.clone(); + if fetch.is_some() { + // When a theoretically unnecessary sort becomes a top-K (which + // sometimes arises as an intermediate state before full removal), + // its execution mode should become `Bounded`. + let sort_requirement = PhysicalSortRequirement::from_sort_exprs(self.expr()); + if self + .input() + .equivalence_properties() + .ordering_satisfy_requirement(&sort_requirement) + { + cache.execution_mode = ExecutionMode::Bounded; + } + } + + SortExec { + input: Arc::clone(&self.input), + expr: self.expr.clone(), + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + fetch, + cache, + } } /// Input schema @@ -933,14 +955,7 @@ impl ExecutionPlan for SortExec { } fn with_fetch(&self, limit: Option) -> Option> { - Some(Arc::new(SortExec { - input: Arc::clone(&self.input), - expr: self.expr.clone(), - metrics_set: self.metrics_set.clone(), - preserve_partitioning: self.preserve_partitioning, - fetch: limit, - cache: self.cache.clone(), - })) + Some(Arc::new(SortExec::with_fetch(self, limit))) } fn fetch(&self) -> Option { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 084436ee376d..9f10a20d5fc4 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -551,7 +551,7 @@ impl PartitionSearcher for LinearSearch { window_expr: &[Arc], ) -> Result> { let partition_bys = - self.evaluate_partition_by_column_values(record_batch, window_expr)?; + evaluate_partition_by_column_values(record_batch, window_expr)?; // NOTE: In Linear or PartiallySorted modes, we are sure that // `partition_bys` are not empty. // Calculate indices for each partition and construct a new record @@ -618,25 +618,6 @@ impl LinearSearch { } } - /// Calculates partition by expression results for each window expression - /// on `record_batch`. - fn evaluate_partition_by_column_values( - &self, - record_batch: &RecordBatch, - window_expr: &[Arc], - ) -> Result> { - window_expr[0] - .partition_by() - .iter() - .map(|item| match item.evaluate(record_batch)? { - ColumnarValue::Array(array) => Ok(array), - ColumnarValue::Scalar(scalar) => { - scalar.to_array_of_size(record_batch.num_rows()) - } - }) - .collect() - } - /// Calculate indices of each partition (according to PARTITION BY expression) /// `columns` contain partition by expression results. fn get_per_partition_indices( @@ -683,7 +664,7 @@ impl LinearSearch { window_expr: &[Arc], ) -> Result)>> { let partition_by_columns = - self.evaluate_partition_by_column_values(input_buffer, window_expr)?; + evaluate_partition_by_column_values(input_buffer, window_expr)?; // Reset the row_map state: self.row_map_out.clear(); let mut partition_indices: Vec<(PartitionKey, Vec)> = vec![]; @@ -852,6 +833,24 @@ impl SortedSearch { } } +/// Calculates partition by expression results for each window expression +/// on `record_batch`. +fn evaluate_partition_by_column_values( + record_batch: &RecordBatch, + window_expr: &[Arc], +) -> Result> { + window_expr[0] + .partition_by() + .iter() + .map(|item| match item.evaluate(record_batch)? { + ColumnarValue::Array(array) => Ok(array), + ColumnarValue::Scalar(scalar) => { + scalar.to_array_of_size(record_batch.num_rows()) + } + }) + .collect() +} + /// Stream for the bounded window aggregation plan. pub struct BoundedWindowAggStream { schema: SchemaRef, From 61ca06c67639d887e5dafe3e9ed23f212dede4d3 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 27 Aug 2024 15:02:36 +0300 Subject: [PATCH 02/11] Update sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 311 +++++++++++++++++---- 1 file changed, 261 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 30b211e448b5..d08435af650f 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -22,7 +22,9 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::common::spawn_buffered; use crate::expressions::PhysicalSortExpr; @@ -49,11 +51,11 @@ use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_execution::TaskContext; +use datafusion_execution::{RecordBatchStream, TaskContext}; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement; -use futures::{StreamExt, TryStreamExt}; +use futures::{ready, Stream, StreamExt, TryStreamExt}; use log::{debug, trace}; struct ExternalSorterMetrics { @@ -896,53 +898,69 @@ impl ExecutionPlan for SortExec { trace!("End SortExec's input.execute for partition: {}", partition); - if let Some(fetch) = self.fetch.as_ref() { - let mut topk = TopK::try_new( - partition, - input.schema(), - self.expr.clone(), - *fetch, - context.session_config().batch_size(), - context.runtime_env(), - &self.metrics_set, - partition, - )?; - - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - futures::stream::once(async move { - while let Some(batch) = input.next().await { - let batch = batch?; - topk.insert_batch(batch)?; - } - topk.emit() - }) - .try_flatten(), - ))) - } else { - let mut sorter = ExternalSorter::new( - partition, - input.schema(), - self.expr.clone(), - context.session_config().batch_size(), - self.fetch, - execution_options.sort_spill_reservation_bytes, - execution_options.sort_in_place_threshold_bytes, - &self.metrics_set, - context.runtime_env(), + let sort_satisfied = self + .input + .equivalence_properties() + .ordering_satisfy_requirement( + PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(), ); - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - futures::stream::once(async move { - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch).await?; - } - sorter.sort() - }) - .try_flatten(), - ))) + match (sort_satisfied, self.fetch.as_ref()) { + (true, Some(fetch)) => Ok(Box::pin(TopKStream { + input, + schema: self.schema(), + fetch: *fetch, + })), + (true, None) => Ok(input), + (false, Some(fetch)) => { + let mut topk = TopK::try_new( + partition, + input.schema(), + self.expr.clone(), + *fetch, + context.session_config().batch_size(), + context.runtime_env(), + &self.metrics_set, + partition, + )?; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(async move { + while let Some(batch) = input.next().await { + let batch = batch?; + topk.insert_batch(batch)?; + } + topk.emit() + }) + .try_flatten(), + ))) + } + (false, None) => { + let mut sorter = ExternalSorter::new( + partition, + input.schema(), + self.expr.clone(), + context.session_config().batch_size(), + self.fetch, + execution_options.sort_spill_reservation_bytes, + execution_options.sort_in_place_threshold_bytes, + &self.metrics_set, + context.runtime_env(), + ); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(async move { + while let Some(batch) = input.next().await { + let batch = batch?; + sorter.insert_batch(batch).await?; + } + sorter.sort() + }) + .try_flatten(), + ))) + } } } @@ -963,9 +981,51 @@ impl ExecutionPlan for SortExec { } } +struct TopKStream { + input: SendableRecordBatchStream, + schema: SchemaRef, + fetch: usize, +} + +impl Stream for TopKStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + if self.fetch > 0 { + if self.fetch >= batch.num_rows() { + self.fetch -= batch.num_rows(); + Poll::Ready(Some(Ok(batch))) + } else { + let batch = batch.slice(0, self.fetch); + self.fetch = 0; + Poll::Ready(Some(Ok(batch))) + } + } else { + debug_assert_eq!(self.fetch, 0); + Poll::Ready(None) + } + } + other => Poll::Ready(other), + } + } +} + +impl RecordBatchStream for TopKStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; + use std::pin::Pin; + use std::task::{Context, Poll}; use super::*; use crate::coalesce_partitions::CoalescePartitionsExec; @@ -980,12 +1040,125 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; + use datafusion_common::Result; + use datafusion_common::{assert_batches_eq, ScalarValue}; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeConfig; + use datafusion_execution::RecordBatchStream; + use datafusion_physical_expr::expressions::{Column, Literal}; + use datafusion_physical_expr::EquivalenceProperties; - use datafusion_common::ScalarValue; - use datafusion_physical_expr::expressions::Literal; - use futures::FutureExt; + use futures::{FutureExt, Stream}; + + #[derive(Debug, Clone)] + pub struct SortedUnboundedExec { + schema: Schema, + batch_size: u64, + cache: PlanProperties, + } + + impl DisplayAs for SortedUnboundedExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "UnboundableExec",).unwrap() + } + } + Ok(()) + } + } + + impl SortedUnboundedExec { + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let mut eq_properties = EquivalenceProperties::new(schema); + eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new( + Arc::new(Column::new("c1", 0)), + SortOptions::default(), + )]]); + let mode = ExecutionMode::Unbounded; + PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode) + } + } + + impl ExecutionPlan for SortedUnboundedExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(SortedUnboundedStream { + schema: Arc::new(self.schema.clone()), + batch_size: self.batch_size, + offset: 0, + })) + } + } + + #[derive(Debug)] + pub struct SortedUnboundedStream { + schema: SchemaRef, + batch_size: u64, + offset: u64, + } + + impl Stream for SortedUnboundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + let batch = SortedUnboundedStream::create_record_batch( + Arc::clone(&self.schema), + self.offset, + self.batch_size, + ); + self.offset += self.batch_size; + Poll::Ready(Some(Ok(batch))) + } + } + + impl RecordBatchStream for SortedUnboundedStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + } + + impl SortedUnboundedStream { + fn create_record_batch( + schema: SchemaRef, + offset: u64, + batch_size: u64, + ) -> RecordBatch { + let values = (0..batch_size).map(|i| offset + i).collect::>(); + let array = UInt64Array::from(values); + let array_ref: ArrayRef = Arc::new(array); + RecordBatch::try_new(schema, vec![array_ref]).unwrap() + } + } #[tokio::test] async fn test_in_mem_sort() -> Result<()> { @@ -1424,4 +1597,42 @@ mod tests { let result = sort_batch(&batch, &expressions, None).unwrap(); assert_eq!(result.num_rows(), 1); } + + #[tokio::test] + async fn topk_unbounded_source() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]); + let source = SortedUnboundedExec { + schema: schema.clone(), + batch_size: 2, + cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())), + }; + let mut plan = SortExec::new( + vec![PhysicalSortExpr::new( + Arc::new(Column::new("c1", 0)), + SortOptions::default(), + )], + Arc::new(source), + ); + plan = plan.with_fetch(Some(9)); + + let batches = collect(Arc::new(plan), task_ctx).await?; + #[rustfmt::skip] + let expected = [ + "+----+", + "| c1 |", + "+----+", + "| 0 |", + "| 1 |", + "| 2 |", + "| 3 |", + "| 4 |", + "| 5 |", + "| 6 |", + "| 7 |", + "| 8 |", + "+----+",]; + assert_batches_eq!(expected, &batches); + Ok(()) + } } From 5f80cdcaf3d9e9da5aecaa9e5e42435684d006a6 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 27 Aug 2024 15:19:15 +0300 Subject: [PATCH 03/11] Update sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 24 ++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index d08435af650f..ab244711a5d4 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -799,6 +799,22 @@ impl SortExec { sort_exprs: LexOrdering, preserve_partitioning: bool, ) -> PlanProperties { + // Determine execution mode: + let mode = match input.execution_mode() { + ExecutionMode::Unbounded + if input.equivalence_properties().ordering_satisfy_requirement( + PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()) + .as_slice(), + ) => + { + ExecutionMode::Unbounded + } + ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { + ExecutionMode::PipelineBreaking + } + ExecutionMode::Bounded => ExecutionMode::Bounded, + }; + // Calculate equivalence properties; i.e. reset the ordering equivalence // class with the new ordering: let eq_properties = input @@ -810,14 +826,6 @@ impl SortExec { let output_partitioning = Self::output_partitioning_helper(input, preserve_partitioning); - // Determine execution mode: - let mode = match input.execution_mode() { - ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { - ExecutionMode::PipelineBreaking - } - ExecutionMode::Bounded => ExecutionMode::Bounded, - }; - PlanProperties::new(eq_properties, output_partitioning, mode) } } From 65679db941132f4ba210d5c7311142bf899f621c Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 27 Aug 2024 15:26:48 +0300 Subject: [PATCH 04/11] Update sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 35 +++++++--------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index ab244711a5d4..1bd2972a0ea8 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -746,16 +746,10 @@ impl SortExec { // When a theoretically unnecessary sort becomes a top-K (which // sometimes arises as an intermediate state before full removal), // its execution mode should become `Bounded`. - let sort_requirement = PhysicalSortRequirement::from_sort_exprs(self.expr()); - if self - .input() - .equivalence_properties() - .ordering_satisfy_requirement(&sort_requirement) - { + if self.cache.execution_mode == ExecutionMode::Unbounded { cache.execution_mode = ExecutionMode::Bounded; } } - SortExec { input: Arc::clone(&self.input), expr: self.expr.clone(), @@ -800,19 +794,16 @@ impl SortExec { preserve_partitioning: bool, ) -> PlanProperties { // Determine execution mode: + let input_satisfies_sort_req = + input.equivalence_properties().ordering_satisfy_requirement( + PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(), + ); let mode = match input.execution_mode() { - ExecutionMode::Unbounded - if input.equivalence_properties().ordering_satisfy_requirement( - PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()) - .as_slice(), - ) => - { + ExecutionMode::Unbounded if input_satisfies_sort_req => { ExecutionMode::Unbounded } - ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { - ExecutionMode::PipelineBreaking - } ExecutionMode::Bounded => ExecutionMode::Bounded, + _ => ExecutionMode::PipelineBreaking, }; // Calculate equivalence properties; i.e. reset the ordering equivalence @@ -906,12 +897,10 @@ impl ExecutionPlan for SortExec { trace!("End SortExec's input.execute for partition: {}", partition); - let sort_satisfied = self - .input - .equivalence_properties() - .ordering_satisfy_requirement( - PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(), - ); + let sort_satisfied = matches!( + self.cache.execution_mode, + ExecutionMode::Bounded | ExecutionMode::Unbounded + ); match (sort_satisfied, self.fetch.as_ref()) { (true, Some(fetch)) => Ok(Box::pin(TopKStream { @@ -931,7 +920,6 @@ impl ExecutionPlan for SortExec { &self.metrics_set, partition, )?; - Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { @@ -956,7 +944,6 @@ impl ExecutionPlan for SortExec { &self.metrics_set, context.runtime_env(), ); - Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { From 35bfb109b4adc0be059ae04e1c3f173be7b119f1 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 27 Aug 2024 15:58:39 +0300 Subject: [PATCH 05/11] Update sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 1bd2972a0ea8..3bbc0221afb9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -897,10 +897,12 @@ impl ExecutionPlan for SortExec { trace!("End SortExec's input.execute for partition: {}", partition); - let sort_satisfied = matches!( - self.cache.execution_mode, - ExecutionMode::Bounded | ExecutionMode::Unbounded - ); + let sort_satisfied = self + .input + .equivalence_properties() + .ordering_satisfy_requirement( + PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(), + ); match (sort_satisfied, self.fetch.as_ref()) { (true, Some(fetch)) => Ok(Box::pin(TopKStream { From 77152555a573aef97d7a9c88a2b941df9386187e Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 28 Aug 2024 10:07:52 +0300 Subject: [PATCH 06/11] Update sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3bbc0221afb9..6afaec62e6b9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -742,13 +742,11 @@ impl SortExec { /// can be dropped. pub fn with_fetch(&self, fetch: Option) -> Self { let mut cache = self.cache.clone(); - if fetch.is_some() { + if fetch.is_some() && self.cache.execution_mode == ExecutionMode::Unbounded { // When a theoretically unnecessary sort becomes a top-K (which // sometimes arises as an intermediate state before full removal), // its execution mode should become `Bounded`. - if self.cache.execution_mode == ExecutionMode::Unbounded { - cache.execution_mode = ExecutionMode::Bounded; - } + cache.execution_mode = ExecutionMode::Bounded; } SortExec { input: Arc::clone(&self.input), From 7d713ffb485652b7206b90e08934e9f134f2afac Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Wed, 28 Aug 2024 11:27:21 +0300 Subject: [PATCH 07/11] Apply suggestions from code review --- datafusion/physical-plan/src/sorts/sort.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6afaec62e6b9..7bbb78297890 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -792,12 +792,12 @@ impl SortExec { preserve_partitioning: bool, ) -> PlanProperties { // Determine execution mode: - let input_satisfies_sort_req = + let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement( PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(), ); let mode = match input.execution_mode() { - ExecutionMode::Unbounded if input_satisfies_sort_req => { + ExecutionMode::Unbounded if sort_satisfied => { ExecutionMode::Unbounded } ExecutionMode::Bounded => ExecutionMode::Bounded, @@ -1035,8 +1035,7 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; - use datafusion_common::Result; - use datafusion_common::{assert_batches_eq, ScalarValue}; + use datafusion_common::{assert_batches_eq, Result, ScalarValue}; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeConfig; use datafusion_execution::RecordBatchStream; From 8f43cf664747d248b6ea0678c7aa4dd2ca0a1d97 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 28 Aug 2024 12:03:23 +0300 Subject: [PATCH 08/11] Update sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 7bbb78297890..6a69714a3768 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -792,14 +792,11 @@ impl SortExec { preserve_partitioning: bool, ) -> PlanProperties { // Determine execution mode: - let sort_satisfied = - input.equivalence_properties().ordering_satisfy_requirement( - PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(), - ); + let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement( + PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(), + ); let mode = match input.execution_mode() { - ExecutionMode::Unbounded if sort_satisfied => { - ExecutionMode::Unbounded - } + ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded, ExecutionMode::Bounded => ExecutionMode::Bounded, _ => ExecutionMode::PipelineBreaking, }; From 23643cc086e0401c52ab456ede3edc43b5da0cb3 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Wed, 28 Aug 2024 18:16:35 +0300 Subject: [PATCH 09/11] Update datafusion/physical-plan/src/sorts/sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 7b2298ff0ada..807560f36909 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1034,7 +1034,7 @@ mod tests { use datafusion_common::cast::as_primitive_array; use datafusion_common::{assert_batches_eq, Result, ScalarValue}; use datafusion_execution::config::SessionConfig; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnvBuilder}; + use datafusion_execution::runtime_env::{RuntimeEnvBuilder}; use datafusion_execution::RecordBatchStream; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::EquivalenceProperties; From 2d8954e3599e0dbcdba0f7b7c4644e3758a1b38b Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Wed, 28 Aug 2024 18:17:00 +0300 Subject: [PATCH 10/11] Update datafusion/physical-plan/src/sorts/sort.rs --- datafusion/physical-plan/src/sorts/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 807560f36909..e6f245b1b47d 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1034,7 +1034,7 @@ mod tests { use datafusion_common::cast::as_primitive_array; use datafusion_common::{assert_batches_eq, Result, ScalarValue}; use datafusion_execution::config::SessionConfig; - use datafusion_execution::runtime_env::{RuntimeEnvBuilder}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_execution::RecordBatchStream; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::EquivalenceProperties; From d2208b67cd27bb09b0a9acca530d8fa52c39ce47 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Wed, 28 Aug 2024 20:58:19 +0300 Subject: [PATCH 11/11] Reuse LimitStream --- .../physical-plan/src/metrics/baseline.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 56 +++---------------- 2 files changed, 9 insertions(+), 49 deletions(-) diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index dc345cd8cdcd..b26a08dd0fad 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -56,7 +56,7 @@ pub struct BaselineMetrics { } impl BaselineMetrics { - /// Create a new BaselineMetric structure, and set `start_time` to now + /// Create a new BaselineMetric structure, and set `start_time` to now pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { let start_time = MetricBuilder::new(metrics).start_timestamp(partition); start_time.record(); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e6f245b1b47d..e0041194016c 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -22,12 +22,11 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use crate::common::spawn_buffered; use crate::expressions::PhysicalSortExpr; +use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; @@ -51,11 +50,11 @@ use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_execution::{RecordBatchStream, TaskContext}; +use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement; -use futures::{ready, Stream, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; struct ExternalSorterMetrics { @@ -900,11 +899,12 @@ impl ExecutionPlan for SortExec { ); match (sort_satisfied, self.fetch.as_ref()) { - (true, Some(fetch)) => Ok(Box::pin(TopKStream { + (true, Some(fetch)) => Ok(Box::pin(LimitStream::new( input, - schema: self.schema(), - fetch: *fetch, - })), + 0, + Some(*fetch), + BaselineMetrics::new(&self.metrics_set, partition), + ))), (true, None) => Ok(input), (false, Some(fetch)) => { let mut topk = TopK::try_new( @@ -973,46 +973,6 @@ impl ExecutionPlan for SortExec { } } -struct TopKStream { - input: SendableRecordBatchStream, - schema: SchemaRef, - fetch: usize, -} - -impl Stream for TopKStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - if self.fetch > 0 { - if self.fetch >= batch.num_rows() { - self.fetch -= batch.num_rows(); - Poll::Ready(Some(Ok(batch))) - } else { - let batch = batch.slice(0, self.fetch); - self.fetch = 0; - Poll::Ready(Some(Ok(batch))) - } - } else { - debug_assert_eq!(self.fetch, 0); - Poll::Ready(None) - } - } - other => Poll::Ready(other), - } - } -} - -impl RecordBatchStream for TopKStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - #[cfg(test)] mod tests { use std::collections::HashMap;