Skip to content

Commit

Permalink
Add LimitPushdown optimization rule and CoalesceBatchesExec fetch (ap…
Browse files Browse the repository at this point in the history
…ache#11652)

* Add LimitPushdown skeleton

* Transform StreamTableExec into fetching version when skip is 0

* Transform StreamTableExec into fetching version when skip is non-zero

* Fix non-zero skip test

* Add fetch field to CoalesceBatchesExec

* Tag ProjectionExec, CoalescePartitionsExec and SortPreservingMergeExec as supporting limit pushdown

* Add `with_fetch` to SortExec

* Push limit down through supporting ExecutionPlans

* Reorder LimitPushdown optimization to before SanityCheckPlan

* Refactor LimitPushdown tests

* Refactor LimitPushdown tests

* Add more LimitPushdown tests

* Add fetch support to CoalesceBatchesExec

* Fix tests that were affected

* Refactor LimitPushdown push_down_limits

* Remove unnecessary parameter from coalesce_batches_exec

* Format files

* Apply clippy fixes

* Make CoalesceBatchesExec display consistent

* Fix slt tests according to LimitPushdown rules

* Resolve linter errors

* Minor changes

* Minor changes

* Fix GlobalLimitExec sometimes replacing LocalLimitExec

* Fix unnecessary LocalLimitExec for ProjectionExec

* Rename GlobalOrLocal into LimitExec

* Clarify pushdown recursion

* Minor changes

* Minor

* Do not display when fetch is None

* .rs removal

* Clean-up tpch plans

* Clean-up comments

* Update datafusion/core/src/physical_optimizer/optimizer.rs

* Update datafusion/physical-plan/src/coalesce_batches.rs

* Update datafusion/physical-plan/src/coalesce_batches.rs

* Update datafusion/physical-plan/src/coalesce_batches.rs

* Update datafusion/core/src/physical_optimizer/limit_pushdown.rs

* Update datafusion/core/src/physical_optimizer/limit_pushdown.rs

* Update datafusion/physical-plan/src/lib.rs

* Implement with_fetch() for other source execs

* Minor

* Merge all Global/Local-LimitExec combinations in LimitPushdown

* Fix compile errors after merge

* Update datafusion/core/src/physical_optimizer/limit_pushdown.rs

Remove redundant lınes ın docstrıng

* Avoid code duplication

* Incorporate review feedback

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
4 people authored Jul 28, 2024
1 parent a721be1 commit 5ad6067
Show file tree
Hide file tree
Showing 28 changed files with 1,186 additions and 287 deletions.
13 changes: 13 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,19 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
projected_schema: self.projected_schema.clone(),
projected_output_ordering: self.projected_output_ordering.clone(),
metrics: self.metrics.clone(),
cache: self.cache.clone(),
}))
}
}

pub struct ArrowOpener {
Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ impl ExecutionPlan for AvroExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
projected_schema: self.projected_schema.clone(),
projected_output_ordering: self.projected_output_ordering.clone(),
metrics: self.metrics.clone(),
cache: self.cache.clone(),
}))
}
}

#[cfg(feature = "avro")]
Expand Down
18 changes: 18 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,24 @@ impl ExecutionPlan for CsvExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
has_header: self.has_header,
delimiter: self.delimiter,
quote: self.quote,
escape: self.escape,
comment: self.comment,
newlines_in_values: self.newlines_in_values,
metrics: self.metrics.clone(),
file_compression_type: self.file_compression_type,
cache: self.cache.clone(),
}))
}
}

/// A Config for [`CsvOpener`]
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ impl ExecutionPlan for NdJsonExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
file_compression_type: self.file_compression_type,
cache: self.cache.clone(),
}))
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
18 changes: 18 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,24 @@ impl ExecutionPlan for ParquetExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
metadata_size_hint: self.metadata_size_hint,
parquet_file_reader_factory: self.parquet_file_reader_factory.clone(),
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
}))
}
}

fn should_enable_page_index(
Expand Down
Loading

0 comments on commit 5ad6067

Please sign in to comment.