Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add show external tables #3279

Merged
merged 4 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
let listing_config = ListingTableConfig::new(ListingTableUrl::parse(path)?)
.with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default())))
.with_schema(Arc::new(schema));
let csv = ListingTable::try_new(listing_config)?;
let csv = ListingTable::try_new(listing_config, None)?;
let partition_size = num_cpus::get();
let memtable =
MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ fn get_table(
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config, None)?))
}

fn get_schema(table: &str) -> Schema {
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async fn main() -> Result<()> {
&format!("file://{}", testdata),
listing_options,
None,
None,
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
.with_listing_options(listing_options)
.with_schema(schema);

let csv = async { ListingTable::try_new(config).unwrap() };
let csv = async { ListingTable::try_new(config, None).unwrap() };

let rt = Runtime::new().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ mod tests {
.infer(&ctx.state())
.await
.unwrap();
let table = ListingTable::try_new(config).unwrap();
let table = ListingTable::try_new(config, None).unwrap();

schema
.register_table("alltypes_plain".to_string(), Arc::new(table))
Expand Down
21 changes: 15 additions & 6 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ pub struct ListingTable {
/// File fields + partition columns
table_schema: SchemaRef,
options: ListingOptions,
definition: Option<String>,
}

impl ListingTable {
Expand All @@ -256,7 +257,10 @@ impl ListingTable {
/// If the schema is provided then it must be resolved before creating the table
/// and should contain the fields of the file without the table
/// partitioning columns.
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
pub fn try_new(
config: ListingTableConfig,
definition: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only thing I would prefer to change in this PR. Specifically, rather than change the API of try_new I think we could add a function like

pub fn with_definition(mut self, defintion: Option<String>) -> Self {
...
}

This change I think would reduce the size of this PR significantly

Since I realize this PR has been waiting for a week for feedback I will propose this change via PR

) -> Result<Self> {
let file_schema = config
.file_schema
.ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
Expand All @@ -280,6 +284,7 @@ impl ListingTable {
file_schema,
table_schema: Arc::new(Schema::new(table_fields)),
options,
definition,
};

Ok(table)
Expand Down Expand Up @@ -358,6 +363,10 @@ impl TableProvider for ListingTable {
Ok(TableProviderFilterPushDown::Inexact)
}
}

fn get_table_definition(&self) -> Option<&str> {
self.definition.as_deref()
}
}

impl ListingTable {
Expand Down Expand Up @@ -460,7 +469,7 @@ mod tests {
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
let table = ListingTable::try_new(config, None)?;

let exec = table.scan(&state, &None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, Some(8));
Expand Down Expand Up @@ -489,7 +498,7 @@ mod tests {
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(file_schema);
let table = ListingTable::try_new(config)?;
let table = ListingTable::try_new(config, None)?;

assert_eq!(
columns(&table.schema()),
Expand Down Expand Up @@ -660,7 +669,7 @@ mod tests {
let config = ListingTableConfig::new(table_path)
.infer(&ctx.state())
.await?;
let table = ListingTable::try_new(config)?;
let table = ListingTable::try_new(config, None)?;
Ok(Arc::new(table))
}

Expand Down Expand Up @@ -692,7 +701,7 @@ mod tests {
.with_listing_options(opt)
.with_schema(Arc::new(schema));

let table = ListingTable::try_new(config)?;
let table = ListingTable::try_new(config, None)?;

let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;

Expand Down Expand Up @@ -732,7 +741,7 @@ mod tests {
.with_listing_options(opt)
.with_schema(Arc::new(schema));

let table = ListingTable::try_new(config)?;
let table = ListingTable::try_new(config, None)?;

let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;

Expand Down
36 changes: 26 additions & 10 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ impl SessionContext {
ref delimiter,
ref table_partition_cols,
ref if_not_exists,
definition,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
Expand Down Expand Up @@ -292,6 +293,7 @@ impl SessionContext {
location,
options,
provided_schema,
definition,
)
.await?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Expand Down Expand Up @@ -558,7 +560,7 @@ impl SessionContext {
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
let provider = ListingTable::try_new(config, None)?;
self.read_table(Arc::new(provider))
}

Expand All @@ -584,7 +586,7 @@ impl SessionContext {
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
let provider = ListingTable::try_new(config, None)?;

self.read_table(Arc::new(provider))
}
Expand Down Expand Up @@ -618,7 +620,7 @@ impl SessionContext {
.with_listing_options(listing_options)
.with_schema(resolved_schema);

let provider = ListingTable::try_new(config)?;
let provider = ListingTable::try_new(config, None)?;
self.read_table(Arc::new(provider))
}

Expand All @@ -642,7 +644,7 @@ impl SessionContext {
.with_listing_options(listing_options)
.with_schema(resolved_schema);

let provider = ListingTable::try_new(config)?;
let provider = ListingTable::try_new(config, None)?;
self.read_table(Arc::new(provider))
}

Expand All @@ -664,6 +666,7 @@ impl SessionContext {
table_path: impl AsRef<str>,
options: ListingOptions,
provided_schema: Option<SchemaRef>,
sql: Option<String>,
) -> Result<()> {
let table_path = ListingTableUrl::parse(table_path)?;
let resolved_schema = match provided_schema {
Expand All @@ -673,7 +676,7 @@ impl SessionContext {
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(resolved_schema);
let table = ListingTable::try_new(config)?;
let table = ListingTable::try_new(config, sql)?;
self.register_table(name, Arc::new(table))?;
Ok(())
}
Expand All @@ -694,6 +697,7 @@ impl SessionContext {
table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;

Expand All @@ -711,8 +715,14 @@ impl SessionContext {
let listing_options =
options.to_listing_options(self.copied_config().target_partitions);

self.register_listing_table(name, table_path, listing_options, options.schema)
.await?;
self.register_listing_table(
name,
table_path,
listing_options,
options.schema,
None,
)
.await?;
Ok(())
}

Expand All @@ -732,7 +742,7 @@ impl SessionContext {
.parquet_pruning(parquet_pruning)
.to_listing_options(target_partitions);

self.register_listing_table(name, table_path, listing_options, None)
self.register_listing_table(name, table_path, listing_options, None, None)
.await?;
Ok(())
}
Expand All @@ -748,8 +758,14 @@ impl SessionContext {
let listing_options =
options.to_listing_options(self.copied_config().target_partitions);

self.register_listing_table(name, table_path, listing_options, options.schema)
.await?;
self.register_listing_table(
name,
table_path,
listing_options,
options.schema,
None,
)
.await?;
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ fn register_partitioned_aggregate_csv(
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(file_schema);
let table = ListingTable::try_new(config).unwrap();
let table = ListingTable::try_new(config, None).unwrap();

ctx.register_table("t", Arc::new(table))
.expect("registering listing table failed");
Expand Down Expand Up @@ -479,7 +479,7 @@ async fn register_partitioned_alltypes_parquet(
.with_listing_options(options)
.with_schema(file_schema);

let table = ListingTable::try_new(config).unwrap();
let table = ListingTable::try_new(config, None).unwrap();

ctx.register_table("t", Arc::new(table))
.expect("registering listing table failed");
Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/tests/sql/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,29 @@ async fn show_create_table() {
assert_batches_eq!(expected, &results);
}

#[tokio::test]
async fn show_external_create_table() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

let table_sql =
"CREATE EXTERNAL TABLE abc STORED AS CSV WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv'";
plan_and_collect(&ctx, table_sql).await.unwrap();

let result_sql = "SHOW CREATE TABLE abc";
let results = plan_and_collect(&ctx, result_sql).await.unwrap();

let expected = vec![
"+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+",
"| table_catalog | table_schema | table_name | definition |",
"+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+",
"| datafusion | public | abc | CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv |",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ this is great

"+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+",
];

assert_batches_eq!(expected, &results);
}

/// Execute SQL and return results
async fn plan_and_collect(ctx: &SessionContext, sql: &str) -> Result<Vec<RecordBatch>> {
ctx.sql(sql).await?.collect().await
Expand Down
17 changes: 17 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,21 @@ pub enum FileType {
Avro,
}

impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
FileType::NdJson => "NDJSON",
FileType::Parquet => "PARQUET",
FileType::CSV => "CSV",
FileType::Avro => "AVRO",
}
)
}
}

/// Creates an external table.
#[derive(Clone)]
pub struct CreateExternalTable {
Expand All @@ -1246,6 +1261,8 @@ pub struct CreateExternalTable {
pub table_partition_cols: Vec<String>,
/// Option to not error if table already exists
pub if_not_exists: bool,
/// SQL used to create the view, if available
pub definition: Option<String>,
}

/// Produces a relation with string representations of
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ message CreateExternalTableNode {
repeated string table_partition_cols = 6;
bool if_not_exists = 7;
string delimiter = 8;
string definition = 9;
}

message CreateCatalogSchemaNode {
Expand Down
12 changes: 11 additions & 1 deletion datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.with_listing_options(options)
.with_schema(Arc::new(schema));

let provider = ListingTable::try_new(config)?;
let provider = ListingTable::try_new(config, None)?;

LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
Expand Down Expand Up @@ -493,6 +493,11 @@ impl AsLogicalPlan for LogicalPlanNode {

let pb_file_type: protobuf::FileType =
create_extern_table.file_type.try_into()?;
let definition = if !create_extern_table.definition.is_empty() {
Some(create_extern_table.definition.clone())
} else {
None
};

Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
Expand All @@ -507,6 +512,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.table_partition_cols
.clone(),
if_not_exists: create_extern_table.if_not_exists,
definition,
}))
}
LogicalPlanType::CreateView(create_view) => {
Expand Down Expand Up @@ -1055,6 +1061,7 @@ impl AsLogicalPlan for LogicalPlanNode {
schema: df_schema,
table_partition_cols,
if_not_exists,
definition,
}) => {
use datafusion::logical_plan::FileType;

Expand All @@ -1076,6 +1083,9 @@ impl AsLogicalPlan for LogicalPlanNode {
table_partition_cols: table_partition_cols.clone(),
if_not_exists: *if_not_exists,
delimiter: String::from(*delimiter),
definition: definition
.clone()
.unwrap_or_else(|| "".to_string()),
},
)),
})
Expand Down
Loading