Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation about ParquetExec / Parquet predicate pushdown #11994

Merged
merged 6 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ pub trait TreeNodeVisitor<'n>: Sized {
/// A [Visitor](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively
/// rewriting [`TreeNode`]s via [`TreeNode::rewrite`].
///
/// For example you can implement this trait on a struct to rewrite `Expr` or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add an example of it? 🤔

/// `LogicalPlan` that needs to track state during the rewrite.
///
/// See [`TreeNode`] for more details on available APIs
///
/// When passed to [`TreeNode::rewrite`], [`TreeNodeRewriter::f_down`] and
Expand Down
60 changes: 38 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,12 @@ pub use writer::plan_to_parquet;
///
/// Supports the following optimizations:
///
/// * Concurrent reads: Can read from one or more files in parallel as multiple
/// * Concurrent reads: reads from one or more files in parallel as multiple
/// partitions, including concurrently reading multiple row groups from a single
/// file.
///
/// * Predicate push down: skips row groups and pages based on
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
/// * Predicate push down: skips row groups and pages and rows based on metadata
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// and late materialization. See "Predicate Pushdown" below.
///
/// * Projection pushdown: reads and decodes only the columns required.
///
Expand All @@ -132,9 +131,8 @@ pub use writer::plan_to_parquet;
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
/// details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
/// * Schema evolution: read parquet files with different schemas into a unified
/// table schema. See [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
Expand All @@ -144,6 +142,29 @@ pub use writer::plan_to_parquet;
/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages
/// based on external information. See "Implementing External Indexes" below
///
/// # Predicate Pushdown
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to consolidate the description of what predicate pushdown is done in the ParquetExec

///
/// `ParquetExec` uses the provided [`PhysicalExpr`] predicate as a filter to
/// skip reading data and improve query performance using several techniques:
alamb marked this conversation as resolved.
Show resolved Hide resolved
///
/// * Row group pruning: skips entire row groups based on min/max statistics
/// found in [`ParquetMetaData`] and any Bloom filters that are present.
///
/// * Page pruning: skips individual pages within a ColumnChunk using the
/// [Parquet PageIndex], if present.
///
/// * Row filtering: skips rows within a page based using a form of late
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// materialization. When possible, predicates are applied by the parquet
/// decoder *during* decode (see [`ArrowPredicate`] and [`RowFilter`] for more
/// details). This is only enabled if `pushdown_filters` is set to true.
alamb marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note: If the predicate can not be used to accelerate the scan, it is ignored
/// (no error is raised on predicate evaluation errors).
///
/// [`ArrowPredicate`]: parquet::arrow::arrow_reader::ArrowPredicate
/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
///
/// # Implementing External Indexes
///
/// It is possible to restrict the row groups and selections within those row
Expand Down Expand Up @@ -199,10 +220,11 @@ pub use writer::plan_to_parquet;
/// applying predicates to metadata. The plan and projections are used to
/// determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
/// * Step 4: The stream begins reading data, fetching the required parquet
/// pages incrementally decoding them, and applying any row filters (see
/// [`Self::with_pushdown_filters`]).
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
Expand Down Expand Up @@ -268,13 +290,10 @@ impl ParquetExecBuilder {
}
}

/// Set the predicate for the scan.
///
/// The ParquetExec uses this predicate to filter row groups and data pages
/// using the Parquet statistics and bloom filters.
/// Set the filter predicate when reading.
///
/// If the predicate can not be used to prune the scan, it is ignored (no
/// error is raised).
/// See the "Predicate Pushdown" section of the [`ParquetExec`] documenation
/// for more details.
pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
self.predicate = Some(predicate);
self
Expand All @@ -291,7 +310,7 @@ impl ParquetExecBuilder {
self
}

/// Set the table parquet options that control how the ParquetExec reads.
/// Set the options for controlling how the ParquetExec reads parquet files.
///
/// See also [`Self::new_with_options`]
pub fn with_table_parquet_options(
Expand Down Expand Up @@ -480,11 +499,8 @@ impl ParquetExec {
self
}

/// If true, any filter [`Expr`]s on the scan will converted to a
/// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
/// `ParquetRecordBatchStream`. These filters are applied by the
/// parquet decoder to skip unecessairly decoding other columns
/// which would not pass the predicate. Defaults to false
/// If true, the predicate will be used during the parquet scan.
/// Defaults to false
///
/// [`Expr`]: datafusion_expr::Expr
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
Expand Down
Loading
Loading