Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Rename predicate_builder --> pruning_predicate for consistency #1481

Merged
merged 2 commits into from
Dec 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ pub struct ParquetExec {
projected_schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate builder
predicate_builder: Option<PruningPredicate>,
/// Optional predicate for pruning row groups
pruning_predicate: Option<PruningPredicate>,
}

/// Stores metrics about the parquet execution for a particular parquet file
Expand All @@ -95,12 +95,12 @@ impl ParquetExec {
let predicate_creation_errors =
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");

let predicate_builder = predicate.and_then(|predicate_expr| {
let pruning_predicate = predicate.and_then(|predicate_expr| {
match PruningPredicate::try_new(
&predicate_expr,
base_config.file_schema.clone(),
) {
Ok(predicate_builder) => Some(predicate_builder),
Ok(pruning_predicate) => Some(pruning_predicate),
Err(e) => {
debug!(
"Could not create pruning predicate for {:?}: {}",
Expand All @@ -119,7 +119,7 @@ impl ParquetExec {
projected_schema,
projected_statistics,
metrics,
predicate_builder,
pruning_predicate,
}
}

Expand Down Expand Up @@ -200,7 +200,7 @@ impl ExecutionPlan for ParquetExec {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
let predicate_builder = self.predicate_builder.clone();
let pruning_predicate = self.pruning_predicate.clone();
let batch_size = self.base_config.batch_size;
let limit = self.base_config.limit;
let object_store = Arc::clone(&self.base_config.object_store);
Expand All @@ -216,7 +216,7 @@ impl ExecutionPlan for ParquetExec {
partition,
metrics,
&projection,
&predicate_builder,
&pruning_predicate,
batch_size,
response_tx,
limit,
Expand Down Expand Up @@ -356,17 +356,17 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
}

fn build_row_group_predicate(
predicate_builder: &PruningPredicate,
pruning_predicate: &PruningPredicate,
metrics: ParquetFileMetrics,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
let parquet_schema = predicate_builder.schema().as_ref();
let parquet_schema = pruning_predicate.schema().as_ref();

let pruning_stats = RowGroupPruningStatistics {
row_group_metadata,
parquet_schema,
};
let predicate_values = predicate_builder.prune(&pruning_stats);
let predicate_values = pruning_predicate.prune(&pruning_stats);

match predicate_values {
Ok(values) => {
Expand All @@ -392,7 +392,7 @@ fn read_partition(
partition: Vec<PartitionedFile>,
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
pruning_predicate: &Option<PruningPredicate>,
batch_size: usize,
response_tx: Sender<ArrowResult<RecordBatch>>,
limit: Option<usize>,
Expand All @@ -409,9 +409,9 @@ fn read_partition(
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
let mut file_reader =
SerializedFileReader::new(ChunkObjectReader(object_reader))?;
if let Some(predicate_builder) = predicate_builder {
if let Some(pruning_predicate) = pruning_predicate {
let row_group_predicate = build_row_group_predicate(
predicate_builder,
pruning_predicate,
file_metrics,
file_reader.metadata().row_groups(),
);
Expand Down Expand Up @@ -582,12 +582,12 @@ mod tests {
}

#[test]
fn row_group_predicate_builder_simple_expr() -> Result<()> {
fn row_group_pruning_predicate_simple_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// int > 1 => c1_max > 1
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let predicate_builder = PruningPredicate::try_new(&expr, Arc::new(schema))?;
let pruning_predicate = PruningPredicate::try_new(&expr, Arc::new(schema))?;

let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
let rgm1 = get_row_group_meta_data(
Expand All @@ -600,7 +600,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
Expand All @@ -615,12 +615,12 @@ mod tests {
}

#[test]
fn row_group_predicate_builder_missing_stats() -> Result<()> {
fn row_group_pruning_predicate_missing_stats() -> Result<()> {
use crate::logical_plan::{col, lit};
// int > 1 => c1_max > 1
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let predicate_builder = PruningPredicate::try_new(&expr, Arc::new(schema))?;
let pruning_predicate = PruningPredicate::try_new(&expr, Arc::new(schema))?;

let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
let rgm1 = get_row_group_meta_data(
Expand All @@ -633,7 +633,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
Expand All @@ -650,7 +650,7 @@ mod tests {
}

#[test]
fn row_group_predicate_builder_partial_expr() -> Result<()> {
fn row_group_pruning_predicate_partial_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// test row group predicate with partially supported expression
// int > 1 and int % 2 => c1_max > 1 and true
Expand All @@ -659,7 +659,7 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
let predicate_builder = PruningPredicate::try_new(&expr, schema.clone())?;
let pruning_predicate = PruningPredicate::try_new(&expr, schema.clone())?;

let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32),
Expand All @@ -681,7 +681,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
Expand All @@ -697,9 +697,9 @@ mod tests {
// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
Expand All @@ -714,7 +714,7 @@ mod tests {
}

#[test]
fn row_group_predicate_builder_null_expr() -> Result<()> {
fn row_group_pruning_predicate_null_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// test row group predicate with an unknown (Null) expr
//
Expand All @@ -726,7 +726,7 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;

let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32),
Expand All @@ -748,7 +748,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
Expand Down