From ce87bc5508964a57d3f4bb4829ecbd6f489ce06d Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 19 Nov 2024 11:08:56 +0100 Subject: [PATCH] Encapsulate create table/view construction --- .../core/src/catalog_common/listing_schema.rs | 25 +- .../src/datasource/listing_table_factory.rs | 52 +- datafusion/core/src/execution/context/mod.rs | 14 +- datafusion/expr/src/logical_plan/ddl.rs | 493 +++++++++++++++++- datafusion/expr/src/logical_plan/mod.rs | 8 +- datafusion/proto/src/logical_plan/mod.rs | 97 ++-- datafusion/sql/src/query.rs | 15 +- datafusion/sql/src/statement.rs | 82 +-- 8 files changed, 627 insertions(+), 159 deletions(-) diff --git a/datafusion/core/src/catalog_common/listing_schema.rs b/datafusion/core/src/catalog_common/listing_schema.rs index dc55a07ef82d4..6e74baf64f42c 100644 --- a/datafusion/core/src/catalog_common/listing_schema.rs +++ b/datafusion/core/src/catalog_common/listing_schema.rs @@ -25,9 +25,7 @@ use std::sync::{Arc, Mutex}; use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::execution::context::SessionState; -use datafusion_common::{ - Constraints, DFSchema, DataFusionError, HashMap, TableReference, -}; +use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; @@ -131,21 +129,12 @@ impl ListingSchemaProvider { .factory .create( state, - &CreateExternalTable { - schema: Arc::new(DFSchema::empty()), - name, - location: table_url, - file_type: self.format.clone(), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options: Default::default(), - constraints: Constraints::empty(), - column_defaults: Default::default(), - }, + &CreateExternalTable::builder() + .schema(Arc::new(DFSchema::empty())) + .name(name) + .location(table_url) + .file_type(self.format.clone()) + .build()?, ) .await?; let _ = diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 636d1623c5e91..b05a3caafa7d6 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -176,10 +176,10 @@ mod tests { datasource::file_format::csv::CsvFormat, execution::context::SessionContext, }; - use datafusion_common::{Constraints, DFSchema, TableReference}; + use datafusion_common::{DFSchema, TableReference}; #[tokio::test] - async fn test_create_using_non_std_file_ext() { + async fn test_create_using_non_std_file_ext() -> Result<()> { let csv_file = tempfile::Builder::new() .prefix("foo") .suffix(".tbl") @@ -190,21 +190,13 @@ mod tests { let context = SessionContext::new(); let state = context.state(); let name = TableReference::bare("foo"); - let cmd = CreateExternalTable { - name, - location: csv_file.path().to_str().unwrap().to_string(), - file_type: "csv".to_string(), - schema: Arc::new(DFSchema::empty()), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options: HashMap::from([("format.has_header".into(), "true".into())]), - constraints: Constraints::empty(), - column_defaults: HashMap::new(), - }; + let cmd = CreateExternalTable::builder() + .name(name) + .location(csv_file.path().to_str().unwrap().to_string()) + .file_type("csv".to_string()) + .schema(Arc::new(DFSchema::empty())) + .options(HashMap::from([("format.has_header".into(), "true".into())])) + .build()?; let table_provider = factory.create(&state, &cmd).await.unwrap(); let listing_table = table_provider .as_any() @@ -212,10 +204,11 @@ mod tests { .unwrap(); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); + Ok(()) } #[tokio::test] - async fn test_create_using_non_std_file_ext_csv_options() { + async fn test_create_using_non_std_file_ext_csv_options() -> Result<()> { let csv_file = tempfile::Builder::new() .prefix("foo") .suffix(".tbl") @@ -230,21 +223,13 @@ mod tests { let mut options = HashMap::new(); options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned()); options.insert("format.has_header".into(), "true".into()); - let cmd = CreateExternalTable { - name, - location: csv_file.path().to_str().unwrap().to_string(), - file_type: "csv".to_string(), - schema: Arc::new(DFSchema::empty()), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options, - constraints: Constraints::empty(), - column_defaults: HashMap::new(), - }; + let cmd = CreateExternalTable::builder() + .name(name) + .location(csv_file.path().to_str().unwrap().to_string()) + .file_type("csv".to_string()) + .schema(Arc::new(DFSchema::empty())) + .options(options) + .build()?; let table_provider = factory.create(&state, &cmd).await.unwrap(); let listing_table = table_provider .as_any() @@ -257,5 +242,6 @@ mod tests { assert_eq!(csv_options.schema_infer_max_rec, Some(1000)); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); + Ok(()) } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 5f01d41c31e73..bd50d5872e1dd 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -41,9 +41,9 @@ use crate::{ logical_expr::ScalarUDF, logical_expr::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, - DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable, - TableType, UNNAMED_TABLE, + CreateMemoryTable, CreateMemoryTableFields, CreateView, CreateViewFields, + DropCatalogSchema, DropFunction, DropTable, DropView, Execute, LogicalPlan, + LogicalPlanBuilder, Prepare, SetVariable, TableType, UNNAMED_TABLE, }, physical_expr::PhysicalExpr, physical_plan::ExecutionPlan, @@ -792,7 +792,7 @@ impl SessionContext { } async fn create_memory_table(&self, cmd: CreateMemoryTable) -> Result { - let CreateMemoryTable { + let CreateMemoryTableFields { name, input, if_not_exists, @@ -800,7 +800,7 @@ impl SessionContext { constraints, column_defaults, temporary, - } = cmd; + } = cmd.into_fields(); let input = Arc::unwrap_or_clone(input); let input = self.state().optimize(&input)?; @@ -852,13 +852,13 @@ impl SessionContext { } async fn create_view(&self, cmd: CreateView) -> Result { - let CreateView { + let CreateViewFields { name, input, or_replace, definition, temporary, - } = cmd; + } = cmd.into_fields(); let view = self.table(name.clone()).await; diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 93e8b5fd045e7..61684945bb6cb 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -26,7 +26,9 @@ use std::{ use crate::expr::Sort; use arrow::datatypes::DataType; -use datafusion_common::{Constraints, DFSchemaRef, SchemaReference, TableReference}; +use datafusion_common::{ + Constraints, DFSchemaRef, Result, SchemaReference, TableReference, +}; use sqlparser::ast::Ident; /// Various types of DDL (CREATE / DROP) catalog manipulation @@ -189,11 +191,12 @@ impl DdlStatement { /// Creates an external table. #[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] pub struct CreateExternalTable { - /// The table schema - pub schema: DFSchemaRef, /// The table name pub name: TableReference, + /// The table schema + pub schema: DFSchemaRef, /// The physical location pub location: String, /// The file type of physical file @@ -221,8 +224,8 @@ pub struct CreateExternalTable { // Hashing refers to a subset of fields considered in PartialEq. impl Hash for CreateExternalTable { fn hash(&self, state: &mut H) { - self.schema.hash(state); self.name.hash(state); + self.schema.hash(state); self.location.hash(state); self.file_type.hash(state); self.table_partition_cols.hash(state); @@ -285,8 +288,233 @@ impl PartialOrd for CreateExternalTable { } } +impl CreateExternalTable { + pub fn new(fields: CreateExternalTableFields) -> Result { + let CreateExternalTableFields { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } = fields; + Ok(Self { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + }) + } + + pub fn into_fields(self) -> CreateExternalTableFields { + let Self { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } = self; + CreateExternalTableFields { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } + } + + pub fn builder() -> CreateExternalTableBuilder { + CreateExternalTableBuilder::new() + } +} + +/// A struct with same fields as [`CreateExternalTable`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateExternalTable::new`] constructor or the builder. +pub struct CreateExternalTableFields { + /// The table name + pub name: TableReference, + /// The table schema + pub schema: DFSchemaRef, + /// The physical location + pub location: String, + /// The file type of physical file + pub file_type: String, + /// Partition Columns + pub table_partition_cols: Vec, + /// Option to not error if table already exists + pub if_not_exists: bool, + /// Whether the table is a temporary table + pub temporary: bool, + /// SQL used to create the table, if available + pub definition: Option, + /// Order expressions supplied by user + pub order_exprs: Vec>, + /// Whether the table is an infinite streams + pub unbounded: bool, + /// Table(provider) specific options + pub options: HashMap, + /// The list of constraints in the schema, such as primary key, unique, etc. + pub constraints: Constraints, + /// Default values for columns + pub column_defaults: HashMap, +} + +/// A builder or [`CreateExternalTable`]. Use [`CreateExternalTable::builder`] to obtain a new builder instance. +pub struct CreateExternalTableBuilder { + name: Option, + schema: Option, + location: Option, + file_type: Option, + table_partition_cols: Vec, + if_not_exists: bool, + temporary: bool, + definition: Option, + order_exprs: Vec>, + unbounded: bool, + options: HashMap, + constraints: Constraints, + column_defaults: HashMap, +} + +impl CreateExternalTableBuilder { + fn new() -> Self { + Self { + name: None, + schema: None, + location: None, + file_type: None, + table_partition_cols: vec![], + if_not_exists: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Constraints::empty(), + column_defaults: HashMap::new(), + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn schema(mut self, schema: DFSchemaRef) -> Self { + self.schema = Some(schema); + self + } + + pub fn location(mut self, location: String) -> Self { + self.location = Some(location); + self + } + + pub fn file_type(mut self, file_type: String) -> Self { + self.file_type = Some(file_type); + self + } + + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + pub fn if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn definition(mut self, definition: Option) -> Self { + self.definition = definition; + self + } + + pub fn order_exprs(mut self, order_exprs: Vec>) -> Self { + self.order_exprs = order_exprs; + self + } + + pub fn unbounded(mut self, unbounded: bool) -> Self { + self.unbounded = unbounded; + self + } + + pub fn options(mut self, options: HashMap) -> Self { + self.options = options; + self + } + + pub fn constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + pub fn column_defaults(mut self, column_defaults: HashMap) -> Self { + self.column_defaults = column_defaults; + self + } + + pub fn build(self) -> Result { + CreateExternalTable::new(CreateExternalTableFields { + name: self.name.expect("name is required"), + schema: self.schema.expect("schema is required"), + location: self.location.expect("location is required"), + file_type: self.file_type.expect("file_type is required"), + table_partition_cols: self.table_partition_cols, + if_not_exists: self.if_not_exists, + temporary: self.temporary, + definition: self.definition, + order_exprs: self.order_exprs, + unbounded: self.unbounded, + options: self.options, + constraints: self.constraints, + column_defaults: self.column_defaults, + }) + } +} + /// Creates an in memory table. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +#[non_exhaustive] pub struct CreateMemoryTable { /// The table name pub name: TableReference, @@ -300,12 +528,153 @@ pub struct CreateMemoryTable { pub or_replace: bool, /// Default values for columns pub column_defaults: Vec<(String, Expr)>, - /// Wheter the table is `TableType::Temporary` + /// Whether the table is `TableType::Temporary` pub temporary: bool, } +impl CreateMemoryTable { + pub fn new(fields: CreateMemoryTableFields) -> Result { + let CreateMemoryTableFields { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } = fields; + Ok(Self { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + }) + } + + pub fn into_fields(self) -> CreateMemoryTableFields { + let Self { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } = self; + CreateMemoryTableFields { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } + } + + pub fn builder() -> CreateMemoryTableBuilder { + CreateMemoryTableBuilder::new() + } +} + +/// A struct with same fields as [`CreateMemoryTable`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateMemoryTable::new`] constructor or the builder. +pub struct CreateMemoryTableFields { + /// The table name + pub name: TableReference, + /// The list of constraints in the schema, such as primary key, unique, etc. + pub constraints: Constraints, + /// The logical plan + pub input: Arc, + /// Option to not error if table already exists + pub if_not_exists: bool, + /// Option to replace table content if table already exists + pub or_replace: bool, + /// Default values for columns + pub column_defaults: Vec<(String, Expr)>, + /// Whether the table is `TableType::Temporary` + pub temporary: bool, +} + +/// A builder or [`CreateMemoryTable`]. Use [`CreateMemoryTable::builder`] to obtain a new builder instance. +pub struct CreateMemoryTableBuilder { + name: Option, + constraints: Constraints, + input: Option>, + if_not_exists: bool, + or_replace: bool, + column_defaults: Vec<(String, Expr)>, + temporary: bool, +} + +impl CreateMemoryTableBuilder { + fn new() -> Self { + Self { + name: None, + constraints: Constraints::empty(), + input: None, + if_not_exists: false, + or_replace: false, + column_defaults: vec![], + temporary: false, + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + pub fn input(mut self, input: Arc) -> Self { + self.input = Some(input); + self + } + + pub fn if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + pub fn or_replace(mut self, or_replace: bool) -> Self { + self.or_replace = or_replace; + self + } + + pub fn column_defaults(mut self, column_defaults: Vec<(String, Expr)>) -> Self { + self.column_defaults = column_defaults; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn build(self) -> Result { + CreateMemoryTable::new(CreateMemoryTableFields { + name: self.name.expect("name is required"), + constraints: self.constraints, + input: self.input.expect("input is required"), + if_not_exists: self.if_not_exists, + or_replace: self.or_replace, + column_defaults: self.column_defaults, + temporary: self.temporary, + }) + } +} + /// Creates a view. #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] +#[non_exhaustive] pub struct CreateView { /// The table name pub name: TableReference, @@ -315,10 +684,122 @@ pub struct CreateView { pub or_replace: bool, /// SQL used to create the view, if available pub definition: Option, - /// Wheter the view is ephemeral + /// Whether the view is ephemeral + pub temporary: bool, +} + +impl CreateView { + pub fn new(fields: CreateViewFields) -> Result { + let CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } = fields; + Ok(Self { + name, + input, + or_replace, + definition, + temporary, + }) + } + + pub fn into_fields(self) -> CreateViewFields { + let Self { + name, + input, + or_replace, + definition, + temporary, + } = self; + CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } + } + + pub fn builder() -> CreateViewBuilder { + CreateViewBuilder::new() + } +} + +/// A struct with same fields as [`CreateView`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateView::new`] constructor or the builder. +pub struct CreateViewFields { + /// The table name + pub name: TableReference, + /// The logical plan + pub input: Arc, + /// Option to not error if table already exists + pub or_replace: bool, + /// SQL used to create the view, if available + pub definition: Option, + /// Whether the view is ephemeral pub temporary: bool, } +/// A builder or [`CreateView`]. Use [`CreateView::builder`] to obtain a new builder instance. +pub struct CreateViewBuilder { + name: Option, + input: Option>, + or_replace: bool, + definition: Option, + temporary: bool, +} + +impl CreateViewBuilder { + fn new() -> Self { + Self { + name: None, + input: None, + or_replace: false, + definition: None, + temporary: false, + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn input(mut self, input: Arc) -> Self { + self.input = Some(input); + self + } + + pub fn or_replace(mut self, or_replace: bool) -> Self { + self.or_replace = or_replace; + self + } + + pub fn definition(mut self, definition: Option) -> Self { + self.definition = definition; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn build(self) -> Result { + CreateView::new(CreateViewFields { + name: self.name.expect("name is required"), + input: self.input.expect("input is required"), + or_replace: self.or_replace, + definition: self.definition, + temporary: self.temporary, + }) + } +} + /// Creates a catalog (aka "Database"). #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateCatalog { diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5d613d4e80dbf..6be7fc2e25c1d 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -29,9 +29,11 @@ pub use builder::{ LogicalPlanBuilder, LogicalTableSource, UNNAMED_TABLE, }; pub use ddl::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement, - DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateExternalTableBuilder, + CreateExternalTableFields, CreateFunction, CreateFunctionBody, CreateIndex, + CreateMemoryTable, CreateMemoryTableBuilder, CreateMemoryTableFields, CreateView, + CreateViewBuilder, CreateViewFields, DdlStatement, DropCatalogSchema, DropFunction, + DropTable, DropView, OperateFunctionArg, }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 50636048ebc96..49e20540fb3d3 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -62,9 +62,9 @@ use datafusion_expr::{ dml, logical_plan::{ builder::project, Aggregate, CreateCatalog, CreateCatalogSchema, - CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation, - Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort, - SubqueryAlias, TableScan, Values, Window, + CreateExternalTable, CreateExternalTableFields, CreateView, CreateViewFields, + DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare, + Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, }, DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, Statement, WindowUDF, @@ -568,7 +568,7 @@ impl AsLogicalPlan for LogicalPlanNode { } Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - CreateExternalTable { + CreateExternalTable::new(CreateExternalTableFields { schema: pb_schema.try_into()?, name: from_table_reference( create_extern_table.name.as_ref(), @@ -587,7 +587,7 @@ impl AsLogicalPlan for LogicalPlanNode { options: create_extern_table.options.clone(), constraints: constraints.into(), column_defaults, - }, + })?, ))) } LogicalPlanType::CreateView(create_view) => { @@ -602,13 +602,18 @@ impl AsLogicalPlan for LogicalPlanNode { None }; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name: from_table_reference(create_view.name.as_ref(), "CreateView")?, - temporary: create_view.temporary, - input: Arc::new(plan), - or_replace: create_view.or_replace, - definition, - }))) + Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView::new( + CreateViewFields { + name: from_table_reference( + create_view.name.as_ref(), + "CreateView", + )?, + temporary: create_view.temporary, + input: Arc::new(plan), + or_replace: create_view.or_replace, + definition, + }, + )?))) } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| { @@ -1398,7 +1403,9 @@ impl AsLogicalPlan for LogicalPlanNode { )), }), LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - CreateExternalTable { + create_external_table, + )) => { + let CreateExternalTableFields { name, location, file_type, @@ -1412,12 +1419,11 @@ impl AsLogicalPlan for LogicalPlanNode { constraints, column_defaults, temporary, - }, - )) => { + } = create_external_table.clone().into_fields(); let mut converted_order_exprs: Vec = vec![]; for order in order_exprs { let temp = SortExprNodeCollection { - sort_expr_nodes: serialize_sorts(order, extension_codec)?, + sort_expr_nodes: serialize_sorts(&order, extension_codec)?, }; converted_order_exprs.push(temp); } @@ -1425,8 +1431,10 @@ impl AsLogicalPlan for LogicalPlanNode { let mut converted_column_defaults = HashMap::with_capacity(column_defaults.len()); for (col_name, expr) in column_defaults { - converted_column_defaults - .insert(col_name.clone(), serialize_expr(expr, extension_codec)?); + converted_column_defaults.insert( + col_name.clone(), + serialize_expr(&expr, extension_codec)?, + ); } Ok(LogicalPlanNode { @@ -1435,13 +1443,13 @@ impl AsLogicalPlan for LogicalPlanNode { name: Some(name.clone().into()), location: location.clone(), file_type: file_type.clone(), - schema: Some(df_schema.try_into()?), + schema: Some(df_schema.as_ref().try_into()?), table_partition_cols: table_partition_cols.clone(), - if_not_exists: *if_not_exists, - temporary: *temporary, + if_not_exists, + temporary, order_exprs: converted_order_exprs, definition: definition.clone().unwrap_or_default(), - unbounded: *unbounded, + unbounded, options: options.clone(), constraints: Some(constraints.clone().into()), column_defaults: converted_column_defaults, @@ -1449,26 +1457,31 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } - LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name, - input, - or_replace, - definition, - temporary, - })) => Ok(LogicalPlanNode { - logical_plan_type: Some(LogicalPlanType::CreateView(Box::new( - protobuf::CreateViewNode { - name: Some(name.clone().into()), - input: Some(Box::new(LogicalPlanNode::try_from_logical_plan( - input, - extension_codec, - )?)), - or_replace: *or_replace, - temporary: *temporary, - definition: definition.clone().unwrap_or_default(), - }, - ))), - }), + LogicalPlan::Ddl(DdlStatement::CreateView(create_view)) => { + let CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } = create_view.clone().into_fields(); + Ok(LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::CreateView(Box::new( + protobuf::CreateViewNode { + name: Some(name.clone().into()), + input: Some(Box::new( + LogicalPlanNode::try_from_logical_plan( + &input, + extension_codec, + )?, + )), + or_replace, + temporary, + definition: definition.clone().unwrap_or_default(), + }, + ))), + }) + } LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema( CreateCatalogSchema { schema_name, diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 740f9ad3b42c3..851d583afec15 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; +use datafusion_common::{not_impl_err, DFSchema, Result}; use datafusion_expr::expr::Sort; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, LogicalPlan, LogicalPlanBuilder, @@ -134,15 +134,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { match select_into { Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(into.name)?, - constraints: Constraints::empty(), - input: Arc::new(plan), - if_not_exists: false, - or_replace: false, - temporary: false, - column_defaults: vec![], - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(into.name)?) + .input(Arc::new(plan)) + .build()?, ))), _ => Ok(plan), } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 31b836f32b242..be26a832be177 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -444,15 +444,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )?; Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(name)?, - constraints, - input: Arc::new(plan), - if_not_exists, - or_replace, - column_defaults, - temporary, - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(name)?) + .constraints(constraints) + .input(Arc::new(plan)) + .if_not_exists(if_not_exists) + .or_replace(or_replace) + .column_defaults(column_defaults) + .temporary(temporary) + .build()?, ))) } @@ -467,15 +467,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan.schema(), )?; Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(name)?, - constraints, - input: Arc::new(plan), - if_not_exists, - or_replace, - column_defaults, - temporary, - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(name)?) + .constraints(constraints) + .input(Arc::new(plan)) + .if_not_exists(if_not_exists) + .or_replace(or_replace) + .column_defaults(column_defaults) + .temporary(temporary) + .build()?, ))) } } @@ -530,13 +530,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?; plan = self.apply_expr_alias(plan, columns)?; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name: self.object_name_to_table_reference(name)?, - input: Arc::new(plan), - or_replace, - definition: sql, - temporary, - }))) + Ok(LogicalPlan::Ddl(DdlStatement::CreateView( + CreateView::builder() + .name(self.object_name_to_table_reference(name)?) + .input(Arc::new(plan)) + .or_replace(or_replace) + .definition(sql) + .temporary(temporary) + .build()?, + ))) } Statement::ShowCreate { obj_type, obj_name } => match obj_type { ShowCreateObject::Table => self.show_create_table_to_plan(obj_name), @@ -1289,21 +1291,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let constraints = Self::new_constraint_from_table_constraints(&all_constraints, &df_schema)?; Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - PlanCreateExternalTable { - schema: df_schema, - name, - location, - file_type, - table_partition_cols, - if_not_exists, - temporary, - definition, - order_exprs: ordered_exprs, - unbounded, - options: options_map, - constraints, - column_defaults, - }, + PlanCreateExternalTable::builder() + .schema(df_schema) + .name(name) + .location(location) + .file_type(file_type) + .table_partition_cols(table_partition_cols) + .if_not_exists(if_not_exists) + .temporary(temporary) + .definition(definition) + .order_exprs(ordered_exprs) + .unbounded(unbounded) + .options(options_map) + .constraints(constraints) + .column_defaults(column_defaults) + .build()?, ))) }