diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 347925f31f..f237a4ac8e 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -148,7 +148,7 @@ async fn scan_table_by_files( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(&snapshot, log_store, &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 860a02be56..fecc6f3f03 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -39,7 +39,7 @@ use arrow_cast::display::array_value_to_string; use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; -use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; +use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; @@ -65,7 +65,6 @@ use datafusion_common::{ use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::utils::conjunction; use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; -use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_sql::planner::ParserOptions; @@ -77,6 +76,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::delta_datafusion::expr::parse_predicate_expression; +use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt}; use crate::logstore::LogStoreRef; @@ -93,6 +93,7 @@ pub mod logical; pub mod physical; mod find_files; +mod schema_adapter; impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { @@ -451,7 +452,6 @@ pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, log_store: LogStoreRef, filter: Option, - state: &'a SessionState, projection: Option<&'a Vec>, limit: Option, files: Option<&'a [Add]>, @@ -460,16 +460,11 @@ pub(crate) struct DeltaScanBuilder<'a> { } impl<'a> DeltaScanBuilder<'a> { - pub fn new( - snapshot: &'a DeltaTableState, - log_store: LogStoreRef, - state: &'a SessionState, - ) -> Self { + pub fn new(snapshot: &'a DeltaTableState, log_store: LogStoreRef) -> Self { DeltaScanBuilder { snapshot, log_store, filter: None, - state, files: None, projection: None, limit: None, @@ -517,11 +512,7 @@ impl<'a> DeltaScanBuilder<'a> { let schema = match self.schema { Some(schema) => schema, - None => { - self.snapshot - .physical_arrow_schema(self.log_store.object_store()) - .await? - } + None => self.snapshot.arrow_schema()?, }; let logical_schema = df_logical_schema(self.snapshot, &config)?; @@ -635,32 +626,27 @@ impl<'a> DeltaScanBuilder<'a> { .datafusion_table_statistics() .unwrap_or(Statistics::new_unknown(&schema)); + let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig { + object_store_url: self.log_store.object_store_url(), + file_schema, + file_groups: file_groups.into_values().collect(), + statistics: stats, + projection: self.projection.cloned(), + limit: self.limit, + table_partition_cols, + output_ordering: vec![], + }) + .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {})); + // Sometimes (i.e Merge) we want to prune files that don't make the // filter and read the entire contents for files that do match the // filter - let parquet_pushdown = if config.enable_parquet_pushdown { - logical_filter.clone() - } else { - None + if let Some(predicate) = logical_filter { + if config.enable_parquet_pushdown { + exec_plan_builder = exec_plan_builder.with_predicate(predicate); + } }; - let scan = ParquetFormat::new() - .create_physical_plan( - self.state, - FileScanConfig { - object_store_url: self.log_store.object_store_url(), - file_schema, - file_groups: file_groups.into_values().collect(), - statistics: stats, - projection: self.projection.cloned(), - limit: self.limit, - table_partition_cols, - output_ordering: vec![], - }, - parquet_pushdown.as_ref(), - ) - .await?; - let metrics = ExecutionPlanMetricsSet::new(); MetricBuilder::new(&metrics) .global_counter("files_scanned") @@ -671,7 +657,7 @@ impl<'a> DeltaScanBuilder<'a> { Ok(DeltaScan { table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), - parquet_scan: scan, + parquet_scan: exec_plan_builder.build_arc(), config, logical_schema, metrics, @@ -712,7 +698,7 @@ impl TableProvider for DeltaTable { register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session) + let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store()) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -793,7 +779,7 @@ impl TableProvider for DeltaTableProvider { register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) + let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone()) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -1514,7 +1500,7 @@ pub(crate) async fn find_files_scan<'a>( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, log_store, state) + let scan = DeltaScanBuilder::new(snapshot, log_store) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -1749,6 +1735,7 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { + use crate::operations::write::SchemaMode; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Schema}; @@ -1758,6 +1745,7 @@ mod tests { use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor}; use datafusion_expr::lit; + use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; use object_store::path::Path; @@ -2153,9 +2141,184 @@ mod tests { */ } + #[tokio::test] + async fn delta_scan_supports_missing_columns() { + let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Utf8, + true, + )])); + + let batch1 = RecordBatch::try_new( + schema1.clone(), + vec![Arc::new(arrow::array::StringArray::from(vec![ + Some("A"), + Some("B"), + ]))], + ) + .unwrap(); + + let schema2 = Arc::new(ArrowSchema::new(vec![ + Field::new("col_1", DataType::Utf8, true), + Field::new("col_2", DataType::Utf8, true), + ])); + + let batch2 = RecordBatch::try_new( + schema2.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + Some("E"), + Some("F"), + Some("G"), + ])), + Arc::new(arrow::array::StringArray::from(vec![ + Some("E2"), + Some("F2"), + Some("G2"), + ])), + ], + ) + .unwrap(); + + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch2]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let table = crate::DeltaOps(table) + .write(vec![batch1]) + .with_schema_mode(SchemaMode::Merge) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + let log = table.log_store(); + + let provider = + DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + let ctx: SessionContext = DeltaSessionContext::default().into(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx.sql("select col_1, col_2 from test").await.unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec![ + "+-------+-------+", + "| col_1 | col_2 |", + "+-------+-------+", + "| A | |", + "| B | |", + "| E | E2 |", + "| F | F2 |", + "| G | G2 |", + "+-------+-------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn delta_scan_supports_nested_missing_columns() { + let column1_schema1: arrow::datatypes::Fields = + vec![Field::new("col_1a", DataType::Utf8, true)].into(); + let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Struct(column1_schema1.clone()), + true, + )])); + + let batch1 = RecordBatch::try_new( + schema1.clone(), + vec![Arc::new(StructArray::new( + column1_schema1, + vec![Arc::new(arrow::array::StringArray::from(vec![ + Some("A"), + Some("B"), + ]))], + None, + ))], + ) + .unwrap(); + + let column1_schema2: arrow::datatypes::Fields = vec![ + Field::new("col_1a", DataType::Utf8, true), + Field::new("col_1b", DataType::Utf8, true), + ] + .into(); + let schema2 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Struct(column1_schema2.clone()), + true, + )])); + + let batch2 = RecordBatch::try_new( + schema2.clone(), + vec![Arc::new(StructArray::new( + column1_schema2, + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + Some("E"), + Some("F"), + Some("G"), + ])), + Arc::new(arrow::array::StringArray::from(vec![ + Some("E2"), + Some("F2"), + Some("G2"), + ])), + ], + None, + ))], + ) + .unwrap(); + + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch1]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let table = crate::DeltaOps(table) + .write(vec![batch2]) + .with_schema_mode(SchemaMode::Merge) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + let log = table.log_store(); + + let provider = + DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + let ctx: SessionContext = DeltaSessionContext::default().into(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx + .sql("select col_1.col_1a, col_1.col_1b from test") + .await + .unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec![ + "+--------------------+--------------------+", + "| test.col_1[col_1a] | test.col_1[col_1b] |", + "+--------------------+--------------------+", + "| A | |", + "| B | |", + "| E | E2 |", + "| F | F2 |", + "| G | G2 |", + "+--------------------+--------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + #[tokio::test] async fn test_multiple_predicate_pushdown() { - use crate::{datafusion::prelude::SessionContext, DeltaTableBuilder}; + use crate::datafusion::prelude::SessionContext; let schema = Arc::new(ArrowSchema::new(vec![ Field::new("moDified", DataType::Utf8, true), Field::new("id", DataType::Utf8, true), @@ -2198,7 +2361,6 @@ mod tests { #[tokio::test] async fn test_delta_scan_builder_no_scan_config() { - use crate::datafusion::prelude::SessionContext; let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); let table = crate::DeltaOps::new_in_memory() @@ -2207,13 +2369,11 @@ mod tests { .await .unwrap(); - let ctx = SessionContext::new(); - let scan = - DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &ctx.state()) - .with_filter(Some(col("a").eq(lit("s")))) - .build() - .await - .unwrap(); + let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store()) + .with_filter(Some(col("a").eq(lit("s")))) + .build() + .await + .unwrap(); let mut visitor = ParquetPredicateVisitor::default(); visit_execution_plan(&scan, &mut visitor).unwrap(); @@ -2227,7 +2387,6 @@ mod tests { #[tokio::test] async fn test_delta_scan_builder_scan_config_disable_pushdown() { - use crate::datafusion::prelude::SessionContext; let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); let table = crate::DeltaOps::new_in_memory() @@ -2236,9 +2395,8 @@ mod tests { .await .unwrap(); - let ctx = SessionContext::new(); let snapshot = table.snapshot().unwrap(); - let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &ctx.state()) + let scan = DeltaScanBuilder::new(snapshot, table.log_store()) .with_filter(Some(col("a").eq(lit("s")))) .with_scan_config( DeltaScanConfigBuilder::new() diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs new file mode 100644 index 0000000000..ce331a7fea --- /dev/null +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -0,0 +1,68 @@ +use crate::operations::cast::cast_record_batch; +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use std::fmt::Debug; +use std::sync::Arc; + +/// A Schema Adapter Factory which provides casting record batches from parquet to meet +/// delta lake conventions. +#[derive(Debug)] +pub(crate) struct DeltaSchemaAdapterFactory {} + +impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { + fn create(&self, schema: SchemaRef) -> Box { + Box::new(DeltaSchemaAdapter { + table_schema: schema, + }) + } +} + +pub(crate) struct DeltaSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter for DeltaSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if self.table_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + Ok(( + Arc::new(SchemaMapping { + table_schema: self.table_schema.clone(), + }), + projection, + )) + } +} + +#[derive(Debug)] +pub(crate) struct SchemaMapping { + table_schema: SchemaRef, +} + +impl SchemaMapper for SchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + Ok(record_batch) + } + + fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + Ok(record_batch) + } +} diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index e5d356f81c..246541ccc1 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -114,7 +114,7 @@ impl std::future::IntoFuture for ConstraintBuilder { session.state() }); - let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone()) .build() .await?; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 56aa9ef98b..ac0b616ef3 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -139,7 +139,7 @@ async fn excute_non_empty_expr( let table_partition_cols = snapshot.metadata().partition_columns.clone(); - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) .with_files(rewrite) .build() .await?; diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 2f30f4aa8a..cd13698bed 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -236,7 +236,7 @@ async fn execute( // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone()) .with_files(&candidates.candidates) .build() .await?; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 0606707c19..fc3dfe44c7 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -511,7 +511,7 @@ async fn execute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) .with_files(rewrite) .build() .await?; diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 27f942b581..ea83bce29e 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -941,13 +941,14 @@ mod local { let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + // Without defining a schema of the select the default for a timestamp is ms UTC let expected = vec![ - "+-------------------------------+---------------------+------------+", - "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", - "+-------------------------------+---------------------+------------+", - "| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |", - "| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |", - "+-------------------------------+---------------------+------------+", + "+-----------------------------+----------------------+------------+", + "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", + "+-----------------------------+----------------------+------------+", + "| 1816-03-28T05:56:08.066278Z | 2022-02-01T00:00:00Z | 2 |", + "| 1816-03-29T05:56:08.066278Z | 2022-01-01T00:00:00Z | 1 |", + "+-----------------------------+----------------------+------------+", ]; assert_batches_sorted_eq!(&expected, &batches); @@ -1154,7 +1155,7 @@ mod local { .unwrap(); let batch = batches.pop().unwrap(); - let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int32, true)]); + let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int64, false)]); assert_eq!(batch.schema().as_ref(), &expected_schema); Ok(()) }