Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure DataFusion SessionState Parquet options are applied to DeltaScan #2702

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn scan_table_by_files(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let scan = DeltaScanBuilder::new(&snapshot, log_store)
let scan = DeltaScanBuilder::new(&snapshot, log_store, &state)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
Expand Down
77 changes: 69 additions & 8 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use arrow_cast::display::array_value_to_string;
use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
Expand Down Expand Up @@ -452,6 +453,7 @@ pub(crate) struct DeltaScanBuilder<'a> {
snapshot: &'a DeltaTableState,
log_store: LogStoreRef,
filter: Option<Expr>,
state: &'a SessionState,
projection: Option<&'a Vec<usize>>,
limit: Option<usize>,
files: Option<&'a [Add]>,
Expand All @@ -460,14 +462,19 @@ pub(crate) struct DeltaScanBuilder<'a> {
}

impl<'a> DeltaScanBuilder<'a> {
pub fn new(snapshot: &'a DeltaTableState, log_store: LogStoreRef) -> Self {
pub fn new(
snapshot: &'a DeltaTableState,
log_store: LogStoreRef,
state: &'a SessionState,
) -> Self {
DeltaScanBuilder {
snapshot,
log_store,
filter: None,
files: None,
state,
projection: None,
limit: None,
files: None,
config: None,
schema: None,
}
Expand Down Expand Up @@ -626,6 +633,11 @@ impl<'a> DeltaScanBuilder<'a> {
.datafusion_table_statistics()
.unwrap_or(Statistics::new_unknown(&schema));

let parquet_options = TableParquetOptions {
global: self.state.config().options().execution.parquet.clone(),
..Default::default()
};

let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema,
Expand All @@ -636,7 +648,8 @@ impl<'a> DeltaScanBuilder<'a> {
table_partition_cols,
output_ordering: vec![],
})
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
.with_table_parquet_options(parquet_options);

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
Expand Down Expand Up @@ -698,7 +711,7 @@ impl TableProvider for DeltaTable {
register_store(self.log_store(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store())
let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -779,7 +792,7 @@ impl TableProvider for DeltaTableProvider {
register_store(self.log_store.clone(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone())
let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -1500,7 +1513,7 @@ pub(crate) async fn find_files_scan<'a>(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let scan = DeltaScanBuilder::new(snapshot, log_store)
let scan = DeltaScanBuilder::new(snapshot, log_store, state)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
Expand Down Expand Up @@ -2369,7 +2382,9 @@ mod tests {
.await
.unwrap();

let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store())
let ctx = SessionContext::new();
let state = ctx.state();
let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state)
.with_filter(Some(col("a").eq(lit("s"))))
.build()
.await
Expand All @@ -2396,7 +2411,9 @@ mod tests {
.unwrap();

let snapshot = table.snapshot().unwrap();
let scan = DeltaScanBuilder::new(snapshot, table.log_store())
let ctx = SessionContext::new();
let state = ctx.state();
let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
.with_filter(Some(col("a").eq(lit("s"))))
.with_scan_config(
DeltaScanConfigBuilder::new()
Expand All @@ -2415,6 +2432,34 @@ mod tests {
assert!(visitor.pruning_predicate.is_none());
}

#[tokio::test]
async fn test_delta_scan_applies_parquet_options() {
let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let snapshot = table.snapshot().unwrap();

let mut config = SessionConfig::default();
config.options_mut().execution.parquet.pushdown_filters = true;
let ctx = SessionContext::new_with_config(config);
let state = ctx.state();

let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
.build()
.await
.unwrap();

let mut visitor = ParquetOptionsVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
}

#[derive(Default)]
struct ParquetPredicateVisitor {
predicate: Option<Arc<dyn PhysicalExpr>>,
Expand All @@ -2432,4 +2477,20 @@ mod tests {
Ok(true)
}
}

#[derive(Default)]
struct ParquetOptionsVisitor {
options: Option<TableParquetOptions>,
}

impl ExecutionPlanVisitor for ParquetOptionsVisitor {
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.options = Some(parquet_exec.table_parquet_options().clone())
}
Ok(true)
}
}
}
2 changes: 1 addition & 1 deletion crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
session.state()
});

let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone())
let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state)
.build()
.await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn excute_non_empty_expr(

let table_partition_cols = snapshot.metadata().partition_columns.clone();

let scan = DeltaScanBuilder::new(snapshot, log_store.clone())
let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
.with_files(rewrite)
.build()
.await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ async fn execute(

// For each rewrite evaluate the predicate and then modify each expression
// to either compute the new value or obtain the old one then write these batches
let scan = DeltaScanBuilder::new(&snapshot, log_store.clone())
let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state)
.with_files(&candidates.candidates)
.build()
.await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ async fn execute_non_empty_expr(
let input_schema = snapshot.input_schema()?;
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

let scan = DeltaScanBuilder::new(snapshot, log_store.clone())
let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
.with_files(rewrite)
.build()
.await?;
Expand Down
Loading