Skip to content

Commit

Permalink
revert5
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Jan 12, 2024
1 parent d88f21b commit 1793ca8
Showing 1 changed file with 9 additions and 12 deletions.
21 changes: 9 additions & 12 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriterOld};
use datafusion_common::{arrow_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_predicate_columns;
Expand Down Expand Up @@ -189,7 +189,7 @@ impl<'a> FilterCandidateBuilder<'a> {
metadata: &ParquetMetaData,
) -> Result<Option<FilterCandidate>> {
let expr = self.expr.clone();
let expr = expr.rewrite(&mut self)?;
let expr = expr.rewrite_old(&mut self)?;

if self.non_primitive_columns || self.projected_columns {
Ok(None)
Expand All @@ -208,33 +208,30 @@ impl<'a> FilterCandidateBuilder<'a> {
}
}

impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
type Node = Arc<dyn PhysicalExpr>;
impl<'a> TreeNodeRewriterOld for FilterCandidateBuilder<'a> {
type N = Arc<dyn PhysicalExpr>;

fn f_down(
&mut self,
node: Arc<dyn PhysicalExpr>,
) -> Result<(Arc<dyn PhysicalExpr>, TreeNodeRecursion)> {
fn pre_visit(&mut self, node: &Arc<dyn PhysicalExpr>) -> Result<RewriteRecursion> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
if let Ok(idx) = self.file_schema.index_of(column.name()) {
self.required_column_indices.insert(idx);

if DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
return Ok((node, TreeNodeRecursion::Skip));
return Ok(RewriteRecursion::Stop);
}
} else if self.table_schema.index_of(column.name()).is_err() {
// If the column does not exist in the (un-projected) table schema then
// it must be a projected column.
self.projected_columns = true;
return Ok((node, TreeNodeRecursion::Skip));
return Ok(RewriteRecursion::Stop);
}
}

Ok((node, TreeNodeRecursion::Continue))
Ok(RewriteRecursion::Continue)
}

fn f_up(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
fn mutate(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if self.file_schema.field_with_name(column.name()).is_err() {
// the column expr must be in the table schema
Expand Down

0 comments on commit 1793ca8

Please sign in to comment.