diff --git a/datafusion/physical-optimizer/src/coalesce_batches.rs b/datafusion/physical-optimizer/src/coalesce_batches.rs index 5cf2c877c61a..e6d9835ec6bd 100644 --- a/datafusion/physical-optimizer/src/coalesce_batches.rs +++ b/datafusion/physical-optimizer/src/coalesce_batches.rs @@ -60,8 +60,7 @@ impl PhysicalOptimizerRule for CoalesceBatches { // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here // would be to build the coalescing logic directly into the operators // See https://github.com/apache/datafusion/issues/139 - let wrap_in_coalesce = plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some() + let wrap_in_coalesce = plan_any.downcast_ref::().is_some() // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec || plan_any .downcast_ref::() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8e7c14f0baed..efa9643f7f04 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -24,6 +24,7 @@ use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::common::can_project; use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, @@ -372,12 +373,15 @@ impl ExecutionPlan for FilterExec { ) -> Result { trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let batch_size = context.session_config().batch_size(); Ok(Box::pin(FilterExecStream { schema: self.schema(), predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, projection: self.projection.clone(), + coalescer: BatchCoalescer::new(self.schema(), batch_size, None), + completed: false, })) } @@ -451,6 +455,61 @@ struct FilterExecStream { baseline_metrics: BaselineMetrics, /// The projection indices of the columns in the input schema projection: Option>, + /// Coalescer to accumulate filtered batches up to target batch size + coalescer: BatchCoalescer, + completed: bool, +} + +impl FilterExecStream { + fn poll_next_inner( + self: &mut Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + if self.completed { + return Poll::Ready(None); + } + + let poll; + loop { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = self.baseline_metrics.elapsed_compute().timer(); + let filtered_batch = filter_and_project( + &batch, + &self.predicate, + self.projection.as_ref(), + &self.schema, + )?; + timer.done(); + // Skip entirely filtered batches + if filtered_batch.num_rows() == 0 { + continue; + } + + let x = match self.coalescer.push_batch(filtered_batch) { + CoalescerState::LimitReached => self.coalescer.finish_batch(), + CoalescerState::TargetReached => self.coalescer.finish_batch(), + CoalescerState::Continue => continue, + }; + + poll = Poll::Ready(Some(x)); + + break; + } + None => { + self.completed = true; + poll = Poll::Ready(Some(self.coalescer.finish_batch())); + break; + } + value => { + poll = Poll::Ready(value); + break; + } + } + } + + poll + } } pub fn batch_filter( @@ -500,31 +559,7 @@ impl Stream for FilterExecStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let poll; - loop { - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); - // Skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; - } - poll = Poll::Ready(Some(Ok(filtered_batch))); - break; - } - value => { - poll = Poll::Ready(value); - break; - } - } - } + let poll = self.poll_next_inner(cx); self.baseline_metrics.record_poll(poll) }