diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs index 0d5c49f377d0..c3c682689542 100644 --- a/datafusion/core/src/catalog/listing_schema.rs +++ b/datafusion/core/src/catalog/listing_schema.rs @@ -149,6 +149,7 @@ impl ListingSchemaProvider { unbounded: false, options: Default::default(), constraints: Constraints::empty(), + column_defaults: Default::default(), }, ) .await?; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a3be57db3a83..effeacc4804f 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -17,6 +17,7 @@ //! The table implementation. +use std::collections::HashMap; use std::str::FromStr; use std::{any::Any, sync::Arc}; @@ -558,6 +559,7 @@ pub struct ListingTable { collected_statistics: FileStatisticsCache, infinite_source: bool, constraints: Constraints, + column_defaults: HashMap, } impl ListingTable { @@ -596,6 +598,7 @@ impl ListingTable { collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), infinite_source, constraints: Constraints::empty(), + column_defaults: HashMap::new(), }; Ok(table) @@ -607,6 +610,15 @@ impl ListingTable { self } + /// Assign column defaults + pub fn with_column_defaults( + mut self, + column_defaults: HashMap, + ) -> Self { + self.column_defaults = column_defaults; + self + } + /// Set the [`FileStatisticsCache`] used to cache parquet file statistics. /// /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics @@ -844,6 +856,10 @@ impl TableProvider for ListingTable { .create_writer_physical_plan(input, state, config, order_requirements) .await } + + fn get_column_default(&self, column: &str) -> Option<&Expr> { + self.column_defaults.get(column) + } } impl ListingTable { diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index f70a82035108..96436306c641 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -228,7 +228,8 @@ impl TableProviderFactory for ListingTableFactory { .with_cache(state.runtime_env().cache_manager.get_file_statistic_cache()); let table = provider .with_definition(cmd.definition.clone()) - .with_constraints(cmd.constraints.clone()); + .with_constraints(cmd.constraints.clone()) + .with_column_defaults(cmd.column_defaults.clone()); Ok(Arc::new(table)) } } @@ -279,6 +280,7 @@ mod tests { unbounded: false, options: HashMap::new(), constraints: Constraints::empty(), + column_defaults: HashMap::new(), }; let table_provider = factory.create(&state, &cmd).await.unwrap(); let listing_table = table_provider diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index a841518d9c8f..7c044b29366d 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -19,9 +19,9 @@ use datafusion_physical_plan::metrics::MetricsSet; use futures::StreamExt; -use hashbrown::HashMap; use log::debug; use std::any::Any; +use std::collections::HashMap; use std::fmt::{self, Debug}; use std::sync::Arc; diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 97551a941abf..e74992d99373 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -194,6 +194,8 @@ pub struct CreateExternalTable { pub options: HashMap, /// The list of constraints in the schema, such as primary key, unique, etc. pub constraints: Constraints, + /// Default values for columns + pub column_defaults: HashMap, } // Hashing refers to a subset of fields considered in PartialEq. diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e46e70a1396b..64b8e2807476 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -216,6 +216,7 @@ message CreateExternalTableNode { bool unbounded = 14; map options = 11; Constraints constraints = 15; + map column_defaults = 16; } message PrepareNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index a1c177541981..34ad63d819e5 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -4026,6 +4026,9 @@ impl serde::Serialize for CreateExternalTableNode { if self.constraints.is_some() { len += 1; } + if !self.column_defaults.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.CreateExternalTableNode", len)?; if let Some(v) = self.name.as_ref() { struct_ser.serialize_field("name", v)?; @@ -4069,6 +4072,9 @@ impl serde::Serialize for CreateExternalTableNode { if let Some(v) = self.constraints.as_ref() { struct_ser.serialize_field("constraints", v)?; } + if !self.column_defaults.is_empty() { + struct_ser.serialize_field("columnDefaults", &self.column_defaults)?; + } struct_ser.end() } } @@ -4099,6 +4105,8 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { "unbounded", "options", "constraints", + "column_defaults", + "columnDefaults", ]; #[allow(clippy::enum_variant_names)] @@ -4117,6 +4125,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { Unbounded, Options, Constraints, + ColumnDefaults, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4152,6 +4161,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { "unbounded" => Ok(GeneratedField::Unbounded), "options" => Ok(GeneratedField::Options), "constraints" => Ok(GeneratedField::Constraints), + "columnDefaults" | "column_defaults" => Ok(GeneratedField::ColumnDefaults), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4185,6 +4195,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { let mut unbounded__ = None; let mut options__ = None; let mut constraints__ = None; + let mut column_defaults__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Name => { @@ -4273,6 +4284,14 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { } constraints__ = map_.next_value()?; } + GeneratedField::ColumnDefaults => { + if column_defaults__.is_some() { + return Err(serde::de::Error::duplicate_field("columnDefaults")); + } + column_defaults__ = Some( + map_.next_value::>()? + ); + } } } Ok(CreateExternalTableNode { @@ -4290,6 +4309,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { unbounded: unbounded__.unwrap_or_default(), options: options__.unwrap_or_default(), constraints: constraints__, + column_defaults: column_defaults__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index b9fb616b3133..8b4dd1b759d6 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -360,6 +360,11 @@ pub struct CreateExternalTableNode { >, #[prost(message, optional, tag = "15")] pub constraints: ::core::option::Option, + #[prost(map = "string, message", tag = "16")] + pub column_defaults: ::std::collections::HashMap< + ::prost::alloc::string::String, + LogicalExprNode, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 851f062bd51f..50bca0295def 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::str::FromStr; use std::sync::Arc; @@ -521,6 +522,13 @@ impl AsLogicalPlan for LogicalPlanNode { order_exprs.push(order_expr) } + let mut column_defaults = + HashMap::with_capacity(create_extern_table.column_defaults.len()); + for (col_name, expr) in &create_extern_table.column_defaults { + let expr = from_proto::parse_expr(expr, ctx)?; + column_defaults.insert(col_name.clone(), expr); + } + Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable { schema: pb_schema.try_into()?, name: from_owned_table_reference(create_extern_table.name.as_ref(), "CreateExternalTable")?, @@ -540,6 +548,7 @@ impl AsLogicalPlan for LogicalPlanNode { unbounded: create_extern_table.unbounded, options: create_extern_table.options.clone(), constraints: constraints.into(), + column_defaults, }))) } LogicalPlanType::CreateView(create_view) => { @@ -1298,6 +1307,7 @@ impl AsLogicalPlan for LogicalPlanNode { unbounded, options, constraints, + column_defaults, }, )) => { let mut converted_order_exprs: Vec = vec![]; @@ -1312,6 +1322,12 @@ impl AsLogicalPlan for LogicalPlanNode { converted_order_exprs.push(temp); } + let mut converted_column_defaults = + HashMap::with_capacity(column_defaults.len()); + for (col_name, expr) in column_defaults { + converted_column_defaults.insert(col_name.clone(), expr.try_into()?); + } + Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::CreateExternalTable( protobuf::CreateExternalTableNode { @@ -1329,6 +1345,7 @@ impl AsLogicalPlan for LogicalPlanNode { unbounded: *unbounded, options: options.clone(), constraints: Some(constraints.clone().into()), + column_defaults: converted_column_defaults, }, )), }) diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index e04a7a9c9d03..5e36a838f311 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -217,11 +217,10 @@ async fn roundtrip_custom_memory_tables() -> Result<()> { async fn roundtrip_custom_listing_tables() -> Result<()> { let ctx = SessionContext::new(); - // Make sure during round-trip, constraint information is preserved let query = "CREATE EXTERNAL TABLE multiple_ordered_table_with_pk ( a0 INTEGER, - a INTEGER, - b INTEGER, + a INTEGER DEFAULT 1*2 + 3, + b INTEGER DEFAULT NULL, c INTEGER, d INTEGER, primary key(c) @@ -232,11 +231,13 @@ async fn roundtrip_custom_listing_tables() -> Result<()> { WITH ORDER (c ASC) LOCATION '../core/tests/data/window_2.csv';"; - let plan = ctx.sql(query).await?.into_optimized_plan()?; + let plan = ctx.state().create_logical_plan(query).await?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; - assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); + // Use exact matching to verify everything. Make sure during round-trip, + // information like constraints, column defaults, and other aspects of the plan are preserved. + assert_eq!(plan, logical_round_trip); Ok(()) } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 4220e83316d8..12083554f093 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -762,11 +762,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )?; } + let mut planner_context = PlannerContext::new(); + + let column_defaults = self + .build_column_defaults(&columns, &mut planner_context)? + .into_iter() + .collect(); + let schema = self.build_schema(columns)?; let df_schema = schema.to_dfschema_ref()?; let ordered_exprs = - self.build_order_by(order_exprs, &df_schema, &mut PlannerContext::new())?; + self.build_order_by(order_exprs, &df_schema, &mut planner_context)?; // External tables do not support schemas at the moment, so the name is just a table name let name = OwnedTableReference::bare(name); @@ -788,6 +795,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { unbounded, options, constraints, + column_defaults, }, ))) } diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index 75252b3b7c35..e20b3779459b 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -382,6 +382,27 @@ select a,b,c,d from test_column_defaults 1 10 100 ABC NULL 20 500 default_text +# fill the timestamp column with default value `now()` again, it should be different from the previous one +query IIITP +insert into test_column_defaults(a, b, c, d) values(2, 20, 200, 'DEF') +---- +1 + +# Ensure that the default expression `now()` is evaluated during insertion, not optimized away. +# Rows are inserted during different time, so their timestamp values should be different. +query I rowsort +select count(distinct e) from test_column_defaults +---- +3 + +# Expect all rows to be true as now() was inserted into the table +query B rowsort +select e < now() from test_column_defaults +---- +true +true +true + statement ok drop table test_column_defaults diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 39323479ff74..85c2db7faaf6 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -543,3 +543,70 @@ select * from table_without_values; statement ok drop table table_without_values; + + +### Test for specifying column's default value + +statement ok +CREATE EXTERNAL TABLE test_column_defaults( + a int, + b int not null default null, + c int default 100*2+300, + d text default lower('DEFAULT_TEXT'), + e timestamp default now() +) STORED AS parquet +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6' +OPTIONS (create_local_path 'true'); + +# fill in all column values +query IIITP +insert into test_column_defaults values(1, 10, 100, 'ABC', now()) +---- +1 + +statement error DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable +insert into test_column_defaults(a) values(2) + +query IIITP +insert into test_column_defaults(b) values(20) +---- +1 + +query IIIT rowsort +select a,b,c,d from test_column_defaults +---- +1 10 100 ABC +NULL 20 500 default_text + +# fill the timestamp column with default value `now()` again, it should be different from the previous one +query IIITP +insert into test_column_defaults(a, b, c, d) values(2, 20, 200, 'DEF') +---- +1 + +# Ensure that the default expression `now()` is evaluated during insertion, not optimized away. +# Rows are inserted during different time, so their timestamp values should be different. +query I rowsort +select count(distinct e) from test_column_defaults +---- +3 + +# Expect all rows to be true as now() was inserted into the table +query B rowsort +select e < now() from test_column_defaults +---- +true +true +true + +statement ok +drop table test_column_defaults + +# test invalid default value +statement error DataFusion error: Error during planning: Column reference is not allowed in the DEFAULT expression : Schema error: No field named a. +CREATE EXTERNAL TABLE test_column_defaults( + a int, + b int default a+1 +) STORED AS parquet +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7' +OPTIONS (create_local_path 'true');