Skip to content

Commit

Permalink
Extract drive-by fixes from PR 12135 for easier reviewing (#12240)
Browse files Browse the repository at this point in the history
* Extract drive-by fixes from PR 12135 for easier reviewing

* Add a few more cfgs to silence warnings with different feature sets

* fmt
  • Loading branch information
itsjunetime authored Sep 2, 2024
1 parent b583591 commit 93fb715
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 95 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ fn hash_struct_array(
Ok(())
}

// only adding this `cfg` b/c this function is only used with this `cfg`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_map_array(
array: &MapArray,
random_state: &RandomState,
Expand Down
16 changes: 8 additions & 8 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ use object_store::{ObjectMeta, ObjectStore};
/// - the table provider can filter the table partition values with this expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| {
match expr {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(name);
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Expand Down Expand Up @@ -745,27 +745,27 @@ mod tests {
#[test]
fn test_expr_applicable_for_cols() {
assert!(expr_applicable_for_cols(
&[String::from("c1")],
&["c1"],
&Expr::eq(col("c1"), lit("value"))
));
assert!(!expr_applicable_for_cols(
&[String::from("c1")],
&["c1"],
&Expr::eq(col("c2"), lit("value"))
));
assert!(!expr_applicable_for_cols(
&[String::from("c1")],
&["c1"],
&Expr::eq(col("c1"), col("c2"))
));
assert!(expr_applicable_for_cols(
&[String::from("c1"), String::from("c2")],
&["c1", "c2"],
&Expr::eq(col("c1"), col("c2"))
));
assert!(expr_applicable_for_cols(
&[String::from("c1"), String::from("c2")],
&["c1", "c2"],
&(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
));
assert!(expr_applicable_for_cols(
&[String::from("c1"), String::from("c2")],
&["c1", "c2"],
&(case(col("c1"))
.when(lit("v1"), lit(true))
.otherwise(lit(false))
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,15 +826,15 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let support: Vec<_> = filters
Ok(filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.map(|x| x.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
Expand All @@ -846,8 +846,7 @@ impl TableProvider for ListingTable {
TableProviderFilterPushDown::Inexact
}
})
.collect();
Ok(support)
.collect())
}

fn get_table_definition(&self) -> Option<&str> {
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,10 +685,12 @@ impl ExecutionPlan for ParquetExec {
partition_index: usize,
ctx: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let projection = match self.base_config.file_column_projection_indices() {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
let projection = self
.base_config
.file_column_projection_indices()
.unwrap_or_else(|| {
(0..self.base_config.file_schema.fields().len()).collect()
});

let parquet_file_reader_factory = self
.parquet_file_reader_factory
Expand All @@ -698,8 +700,7 @@ impl ExecutionPlan for ParquetExec {
ctx.runtime_env()
.object_store(&self.base_config.object_store_url)
.map(|store| {
Arc::new(DefaultParquetFileReaderFactory::new(store))
as Arc<dyn ParquetFileReaderFactory>
Arc::new(DefaultParquetFileReaderFactory::new(store)) as _
})
})?;

Expand Down
95 changes: 33 additions & 62 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
//! the unsorted predicates. Within each partition, predicates are
//! still be sorted by size.
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;

Expand Down Expand Up @@ -129,7 +130,7 @@ impl DatafusionArrowPredicate {
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
_ => remap_projection(&candidate.projection),
2.. => remap_projection(&candidate.projection),
};

Ok(Self {
Expand All @@ -151,32 +152,31 @@ impl ArrowPredicate for DatafusionArrowPredicate {
&self.projection_mask
}

fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
let batch = match self.projection.is_empty() {
true => batch,
false => batch.project(&self.projection)?,
fn evaluate(&mut self, mut batch: RecordBatch) -> ArrowResult<BooleanArray> {
if !self.projection.is_empty() {
batch = batch.project(&self.projection)?;
};

let batch = self.schema_mapping.map_partial_batch(batch)?;

// scoped timer updates on drop
let mut timer = self.time.timer();
match self
.physical_expr

self.physical_expr
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
.and_then(|array| {
let bool_arr = as_boolean_array(&array)?.clone();
let num_filtered = bool_arr.len() - bool_arr.true_count();
self.rows_filtered.add(num_filtered);
timer.stop();
Ok(bool_arr)
}
Err(e) => Err(ArrowError::ComputeError(format!(
"Error evaluating filter predicate: {e:?}"
))),
}
})
.map_err(|e| {
ArrowError::ComputeError(format!(
"Error evaluating filter predicate: {e:?}"
))
})
}
}

Expand Down Expand Up @@ -453,62 +453,33 @@ pub fn build_row_filter(

// no candidates
if candidates.is_empty() {
Ok(None)
} else if reorder_predicates {
// attempt to reorder the predicates by size and whether they are sorted
candidates.sort_by_key(|c| c.required_bytes);

let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) =
candidates.into_iter().partition(|c| c.can_use_index);

let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];

for candidate in indexed_candidates {
let filter = DatafusionArrowPredicate::try_new(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
}

for candidate in other_candidates {
let filter = DatafusionArrowPredicate::try_new(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;
return Ok(None);
}

filters.push(Box::new(filter));
}
if reorder_predicates {
candidates.sort_unstable_by(|c1, c2| {
match c1.can_use_index.cmp(&c2.can_use_index) {
Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
ord => ord,
}
});
}

Ok(Some(RowFilter::new(filters)))
} else {
// otherwise evaluate the predicates in the order the appeared in the
// original expressions
let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
for candidate in candidates {
let filter = DatafusionArrowPredicate::try_new(
candidates
.into_iter()
.map(|candidate| {
DatafusionArrowPredicate::try_new(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
}

Ok(Some(RowFilter::new(filters)))
}
)
.map(|pred| Box::new(pred) as _)
})
.collect::<Result<Vec<_>, _>>()
.map(|filters| Some(RowFilter::new(filters)))
}

#[cfg(test)]
Expand Down
26 changes: 20 additions & 6 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
use std::mem;
use std::sync::Arc;

use arrow_schema::DataType;
use futures::{Stream, StreamExt};

use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;

use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use crate::physical_plan::{ColumnStatistics, Statistics};

#[cfg(feature = "parquet")]
use crate::{
arrow::datatypes::Schema,
functions_aggregate::min_max::{MaxAccumulator, MinAccumulator},
physical_plan::Accumulator,
};

use super::listing::PartitionedFile;

Expand Down Expand Up @@ -144,6 +149,8 @@ pub async fn get_statistics_with_limit(
Ok((result_files, statistics))
}

// only adding this cfg b/c this is the only feature it's used with currently
#[cfg(feature = "parquet")]
pub(crate) fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
Expand Down Expand Up @@ -175,6 +182,8 @@ fn add_row_stats(
}
}

// only adding this cfg b/c this is the only feature it's used with currently
#[cfg(feature = "parquet")]
pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
Expand Down Expand Up @@ -205,8 +214,13 @@ pub(crate) fn get_col_stats(
// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
// The reason min/max aggregate produces unpacked output because there is only one
// min/max value per group; there is no needs to keep them Dictionary encode
fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
if let DataType::Dictionary(_, value_type) = input_type {
//
// only adding this cfg b/c this is the only feature it's used with currently
#[cfg(feature = "parquet")]
fn min_max_aggregate_data_type(
input_type: &arrow_schema::DataType,
) -> &arrow_schema::DataType {
if let arrow_schema::DataType::Dictionary(_, value_type) = input_type {
value_type.as_ref()
} else {
input_type
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ impl SessionStateDefaults {

/// returns the list of default [`ScalarUDF']'s
pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
#[cfg_attr(not(feature = "nested_expressions"), allow(unused_mut))]
let mut functions: Vec<Arc<ScalarUDF>> = functions::all_default_functions();

#[cfg(feature = "nested_expressions")]
functions.append(&mut functions_nested::all_default_nested_functions());

Expand Down Expand Up @@ -144,6 +146,7 @@ impl SessionStateDefaults {
}

/// registers all the builtin array functions
#[cfg_attr(not(feature = "nested_expressions"), allow(unused_variables))]
pub fn register_array_functions(state: &mut SessionState) {
// register crate of array expressions (if enabled)
#[cfg(feature = "nested_expressions")]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ impl PruningPredicate {
is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
}

// this is only used by `parquet` feature right now
#[allow(dead_code)]
pub(crate) fn required_columns(&self) -> &RequiredColumns {
&self.required_columns
}
Expand Down Expand Up @@ -746,6 +748,8 @@ impl RequiredColumns {
/// * `a > 5 OR a < 10` returns `Some(a)`
/// * `a > 5 OR b < 10` returns `None`
/// * `true` returns None
#[allow(dead_code)]
// this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
pub(crate) fn single_column(&self) -> Option<&phys_expr::Column> {
if self.columns.windows(2).all(|w| {
// check if all columns are the same (ignoring statistics and field)
Expand Down
Loading

0 comments on commit 93fb715

Please sign in to comment.