Skip to content

Commit

Permalink
filter with embeddded batch coalescer
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Jan 25, 2025
1 parent 144ebf9 commit ee583d5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 27 deletions.
3 changes: 1 addition & 2 deletions datafusion/physical-optimizer/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
let wrap_in_coalesce = plan_any.downcast_ref::<HashJoinExec>().is_some()
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
Expand Down
85 changes: 60 additions & 25 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::coalesce::{BatchCoalescer, CoalescerState};
use crate::common::can_project;
use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
Expand Down Expand Up @@ -372,12 +373,15 @@ impl ExecutionPlan for FilterExec {
) -> Result<SendableRecordBatchStream> {
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let batch_size = context.session_config().batch_size();
Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
projection: self.projection.clone(),
coalescer: BatchCoalescer::new(self.schema(), batch_size, None),
completed: false,
}))
}

Expand Down Expand Up @@ -451,6 +455,61 @@ struct FilterExecStream {
baseline_metrics: BaselineMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
/// Coalescer to accumulate filtered batches up to target batch size
coalescer: BatchCoalescer,
completed: bool,
}

impl FilterExecStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.completed {
return Poll::Ready(None);
}

let poll;
loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
timer.done();
// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}

let x = match self.coalescer.push_batch(filtered_batch) {
CoalescerState::LimitReached => self.coalescer.finish_batch(),
CoalescerState::TargetReached => self.coalescer.finish_batch(),
CoalescerState::Continue => continue,
};

poll = Poll::Ready(Some(x));

break;
}
None => {
self.completed = true;
poll = Poll::Ready(Some(self.coalescer.finish_batch()));
break;
}
value => {
poll = Poll::Ready(value);
break;
}
}
}

poll
}
}

pub fn batch_filter(
Expand Down Expand Up @@ -500,31 +559,7 @@ impl Stream for FilterExecStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll;
loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
timer.done();
// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
poll = Poll::Ready(Some(Ok(filtered_batch)));
break;
}
value => {
poll = Poll::Ready(value);
break;
}
}
}
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}

Expand Down

0 comments on commit ee583d5

Please sign in to comment.