diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index f237a4ac8e..347925f31f 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -148,7 +148,7 @@ async fn scan_table_by_files( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(&snapshot, log_store) + let scan = DeltaScanBuilder::new(&snapshot, log_store, &state) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index fecc6f3f03..bff1210fea 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -39,6 +39,7 @@ use arrow_cast::display::array_value_to_string; use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; +use datafusion::config::TableParquetOptions; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, @@ -452,6 +453,7 @@ pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, log_store: LogStoreRef, filter: Option, + state: &'a SessionState, projection: Option<&'a Vec>, limit: Option, files: Option<&'a [Add]>, @@ -460,14 +462,19 @@ pub(crate) struct DeltaScanBuilder<'a> { } impl<'a> DeltaScanBuilder<'a> { - pub fn new(snapshot: &'a DeltaTableState, log_store: LogStoreRef) -> Self { + pub fn new( + snapshot: &'a DeltaTableState, + log_store: LogStoreRef, + state: &'a SessionState, + ) -> Self { DeltaScanBuilder { snapshot, log_store, filter: None, - files: None, + state, projection: None, limit: None, + files: None, config: None, schema: None, } @@ -626,6 +633,11 @@ impl<'a> DeltaScanBuilder<'a> { .datafusion_table_statistics() .unwrap_or(Statistics::new_unknown(&schema)); + let parquet_options = TableParquetOptions { + global: self.state.config().options().execution.parquet.clone(), + ..Default::default() + }; + let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig { object_store_url: self.log_store.object_store_url(), file_schema, @@ -636,7 +648,8 @@ impl<'a> DeltaScanBuilder<'a> { table_partition_cols, output_ordering: vec![], }) - .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {})); + .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {})) + .with_table_parquet_options(parquet_options); // Sometimes (i.e Merge) we want to prune files that don't make the // filter and read the entire contents for files that do match the @@ -698,7 +711,7 @@ impl TableProvider for DeltaTable { register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store()) + let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -779,7 +792,7 @@ impl TableProvider for DeltaTableProvider { register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone()) + let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -1500,7 +1513,7 @@ pub(crate) async fn find_files_scan<'a>( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, log_store) + let scan = DeltaScanBuilder::new(snapshot, log_store, state) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -2369,7 +2382,9 @@ mod tests { .await .unwrap(); - let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store()) + let ctx = SessionContext::new(); + let state = ctx.state(); + let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state) .with_filter(Some(col("a").eq(lit("s")))) .build() .await @@ -2396,7 +2411,9 @@ mod tests { .unwrap(); let snapshot = table.snapshot().unwrap(); - let scan = DeltaScanBuilder::new(snapshot, table.log_store()) + let ctx = SessionContext::new(); + let state = ctx.state(); + let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state) .with_filter(Some(col("a").eq(lit("s")))) .with_scan_config( DeltaScanConfigBuilder::new() @@ -2415,6 +2432,34 @@ mod tests { assert!(visitor.pruning_predicate.is_none()); } + #[tokio::test] + async fn test_delta_scan_applies_parquet_options() { + let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); + let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let snapshot = table.snapshot().unwrap(); + + let mut config = SessionConfig::default(); + config.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(config); + let state = ctx.state(); + + let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state) + .build() + .await + .unwrap(); + + let mut visitor = ParquetOptionsVisitor::default(); + visit_execution_plan(&scan, &mut visitor).unwrap(); + + assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap()); + } + #[derive(Default)] struct ParquetPredicateVisitor { predicate: Option>, @@ -2432,4 +2477,20 @@ mod tests { Ok(true) } } + + #[derive(Default)] + struct ParquetOptionsVisitor { + options: Option, + } + + impl ExecutionPlanVisitor for ParquetOptionsVisitor { + type Error = DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + if let Some(parquet_exec) = plan.as_any().downcast_ref::() { + self.options = Some(parquet_exec.table_parquet_options().clone()) + } + Ok(true) + } + } } diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 246541ccc1..e5d356f81c 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -114,7 +114,7 @@ impl std::future::IntoFuture for ConstraintBuilder { session.state() }); - let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone()) + let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state) .build() .await?; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index ac0b616ef3..79a7062f50 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -139,7 +139,7 @@ async fn excute_non_empty_expr( let table_partition_cols = snapshot.metadata().partition_columns.clone(); - let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(rewrite) .build() .await?; diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index cd13698bed..2f30f4aa8a 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -236,7 +236,7 @@ async fn execute( // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(&snapshot, log_store.clone()) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index fc3dfe44c7..0606707c19 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -511,7 +511,7 @@ async fn execute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(rewrite) .build() .await?;