diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index b948f16835ac..2b7a5f6fe268 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -49,6 +49,7 @@ async fn main() -> Result<()> { &format!("file://{}", testdata), listing_options, None, + None, ) .await .unwrap(); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index abf5394fb643..e77773a822cd 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -48,7 +48,7 @@ use super::PartitionedFile; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; -/// Configuration for creating a 'ListingTable' +/// Configuration for creating a 'ListingTable' pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. /// They should share the same schema and object store. @@ -246,6 +246,7 @@ pub struct ListingTable { /// File fields + partition columns table_schema: SchemaRef, options: ListingOptions, + definition: Option, } impl ListingTable { @@ -280,11 +281,18 @@ impl ListingTable { file_schema, table_schema: Arc::new(Schema::new(table_fields)), options, + definition: None, }; Ok(table) } + /// Specify the SQL definition for this table, if any + pub fn with_definition(mut self, defintion: Option) -> Self { + self.definition = defintion; + self + } + /// Get paths ref pub fn table_paths(&self) -> &Vec { &self.table_paths @@ -358,6 +366,10 @@ impl TableProvider for ListingTable { Ok(TableProviderFilterPushDown::Inexact) } } + + fn get_table_definition(&self) -> Option<&str> { + self.definition.as_deref() + } } impl ListingTable { diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index a6a508d36843..43a29108000f 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -502,6 +502,7 @@ impl SessionContext { cmd.location.clone(), options, provided_schema, + cmd.definition.clone(), ) .await?; self.return_empty_dataframe() @@ -720,6 +721,7 @@ impl SessionContext { table_path: impl AsRef, options: ListingOptions, provided_schema: Option, + sql: Option, ) -> Result<()> { let table_path = ListingTableUrl::parse(table_path)?; let resolved_schema = match provided_schema { @@ -729,7 +731,7 @@ impl SessionContext { let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(resolved_schema); - let table = ListingTable::try_new(config)?; + let table = ListingTable::try_new(config)?.with_definition(sql); self.register_table(name, Arc::new(table))?; Ok(()) } @@ -750,6 +752,7 @@ impl SessionContext { table_path, listing_options, options.schema.map(|s| Arc::new(s.to_owned())), + None, ) .await?; @@ -767,8 +770,14 @@ impl SessionContext { let listing_options = options.to_listing_options(self.copied_config().target_partitions); - self.register_listing_table(name, table_path, listing_options, options.schema) - .await?; + self.register_listing_table( + name, + table_path, + listing_options, + options.schema, + None, + ) + .await?; Ok(()) } @@ -788,7 +797,7 @@ impl SessionContext { .parquet_pruning(parquet_pruning) .to_listing_options(target_partitions); - self.register_listing_table(name, table_path, listing_options, None) + self.register_listing_table(name, table_path, listing_options, None, None) .await?; Ok(()) } @@ -804,8 +813,14 @@ impl SessionContext { let listing_options = options.to_listing_options(self.copied_config().target_partitions); - self.register_listing_table(name, table_path, listing_options, options.schema) - .await?; + self.register_listing_table( + name, + table_path, + listing_options, + options.schema, + None, + ) + .await?; Ok(()) } diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index 5a4aa297f808..017330e44f33 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -605,6 +605,29 @@ async fn show_create_table() { assert_batches_eq!(expected, &results); } +#[tokio::test] +async fn show_external_create_table() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + let table_sql = + "CREATE EXTERNAL TABLE abc STORED AS CSV WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv'"; + plan_and_collect(&ctx, table_sql).await.unwrap(); + + let result_sql = "SHOW CREATE TABLE abc"; + let results = plan_and_collect(&ctx, result_sql).await.unwrap(); + + let expected = vec![ + "+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+", + "| table_catalog | table_schema | table_name | definition |", + "+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+", + "| datafusion | public | abc | CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv |", + "+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+", + ]; + + assert_batches_eq!(expected, &results); +} + /// Execute SQL and return results async fn plan_and_collect(ctx: &SessionContext, sql: &str) -> Result> { ctx.sql(sql).await?.collect().await diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 38074f046333..4255e62ec40f 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1255,6 +1255,8 @@ pub struct CreateExternalTable { pub table_partition_cols: Vec, /// Option to not error if table already exists pub if_not_exists: bool, + /// SQL used to create the table, if available + pub definition: Option, } /// Produces a relation with string representations of diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 69e1617f39f9..8d4da0250b99 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -153,6 +153,7 @@ message CreateExternalTableNode { repeated string table_partition_cols = 6; bool if_not_exists = 7; string delimiter = 8; + string definition = 9; } message CreateCatalogSchemaNode { diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 0e1ed38cbfa5..f15aee8bbd6c 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -461,6 +461,12 @@ impl AsLogicalPlan for LogicalPlanNode { )) })?; + let definition = if !create_extern_table.definition.is_empty() { + Some(create_extern_table.definition.clone()) + } else { + None + }; + match create_extern_table.file_type.as_str() { "CSV" | "JSON" | "PARQUET" | "AVRO" => {} it => { @@ -486,6 +492,7 @@ impl AsLogicalPlan for LogicalPlanNode { .table_partition_cols .clone(), if_not_exists: create_extern_table.if_not_exists, + definition, })) } LogicalPlanType::CreateView(create_view) => { @@ -1034,6 +1041,7 @@ impl AsLogicalPlan for LogicalPlanNode { schema: df_schema, table_partition_cols, if_not_exists, + definition, }) => Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::CreateExternalTable( protobuf::CreateExternalTableNode { @@ -1045,6 +1053,7 @@ impl AsLogicalPlan for LogicalPlanNode { table_partition_cols: table_partition_cols.clone(), if_not_exists: *if_not_exists, delimiter: String::from(*delimiter), + definition: definition.clone().unwrap_or_else(|| "".to_string()), }, )), }), diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 6f057b1515ae..b9254159ba52 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -25,7 +25,7 @@ use sqlparser::{ parser::{Parser, ParserError}, tokenizer::{Token, Tokenizer}, }; -use std::collections::VecDeque; +use std::{collections::VecDeque, fmt}; // Use `Parser::expected` instead, if possible macro_rules! parser_err { @@ -59,6 +59,18 @@ pub struct CreateExternalTable { pub if_not_exists: bool, } +impl fmt::Display for CreateExternalTable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CREATE EXTERNAL TABLE ")?; + if self.if_not_exists { + write!(f, "IF NOT EXSISTS ")?; + } + write!(f, "{} ", self.name)?; + write!(f, "STORED AS {} ", self.file_type)?; + write!(f, "LOCATION {} ", self.location) + } +} + /// DataFusion extension DDL for `DESCRIBE TABLE` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DescribeTable { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 5cc388ba01a4..f32597b2f2a2 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -452,6 +452,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &self, statement: CreateExternalTable, ) -> Result { + let definition = Some(statement.to_string()); let CreateExternalTable { name, columns, @@ -481,6 +482,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { delimiter, table_partition_cols, if_not_exists, + definition, })) }