Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add primary key information to CreateMemoryTable LogicalPlan node #5835

Merged
merged 4 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -344,7 +344,15 @@ impl SessionContext {
input, input,
if_not_exists, if_not_exists,
or_replace, or_replace,
primary_key,
}) => { }) => {
if !primary_key.is_empty() {
Err(DataFusionError::Execution(
"Primary keys on MemoryTables are not currently supported!"
.to_string(),
))?;
}

let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone()); let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
let table = self.table(&name).await; let table = self.table(&name).await;


Expand Down
14 changes: 12 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1046,9 +1046,17 @@ impl LogicalPlan {
write!(f, "CreateExternalTable: {name:?}") write!(f, "CreateExternalTable: {name:?}")
} }
LogicalPlan::CreateMemoryTable(CreateMemoryTable { LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name, .. name,
primary_key,
..
}) => { }) => {
write!(f, "CreateMemoryTable: {name:?}") let pk: Vec<String> =
primary_key.iter().map(|c| c.name.to_string()).collect();
let mut pk = pk.join(", ");
if !pk.is_empty() {
pk = format!(" primary_key=[{pk}]");
}
write!(f, "CreateMemoryTable: {name:?}{pk}")
} }
LogicalPlan::CreateView(CreateView { name, .. }) => { LogicalPlan::CreateView(CreateView { name, .. }) => {
write!(f, "CreateView: {name:?}") write!(f, "CreateView: {name:?}")
Expand Down Expand Up @@ -1490,6 +1498,8 @@ pub struct Union {
pub struct CreateMemoryTable { pub struct CreateMemoryTable {
/// The table name /// The table name
pub name: OwnedTableReference, pub name: OwnedTableReference,
/// The ordered list of columns in the primary key, or an empty vector if none
pub primary_key: Vec<Column>,
avantgardnerio marked this conversation as resolved.
Show resolved Hide resolved
/// The logical plan /// The logical plan
pub input: Arc<LogicalPlan>, pub input: Arc<LogicalPlan>,
/// Option to not error if table already exists /// Option to not error if table already exists
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/utils.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ pub fn from_plan(
.. ..
}) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
input: Arc::new(inputs[0].clone()), input: Arc::new(inputs[0].clone()),
primary_key: vec![],
name: name.clone(), name: name.clone(),
if_not_exists: *if_not_exists, if_not_exists: *if_not_exists,
or_replace: *or_replace, or_replace: *or_replace,
Expand Down
74 changes: 61 additions & 13 deletions datafusion/sql/src/statement.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use datafusion_expr::{
use sqlparser::ast; use sqlparser::ast;
use sqlparser::ast::{ use sqlparser::ast::{
Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query, Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query,
SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableFactor, SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement,
TableWithJoins, TransactionMode, UnaryOperator, Value, TableConstraint, TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
}; };


use sqlparser::parser::ParserError::ParserError; use sqlparser::parser::ParserError::ParserError;
Expand Down Expand Up @@ -128,12 +128,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists, if_not_exists,
or_replace, or_replace,
.. ..
} if constraints.is_empty() } if table_properties.is_empty() && with_options.is_empty() => match query {
&& table_properties.is_empty()
&& with_options.is_empty() =>
{
match query {
Some(query) => { Some(query) => {
let primary_key = Self::primary_key_from_constraints(&constraints)?;

let plan = self.query_to_plan(*query, planner_context)?; let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema(); let input_schema = plan.schema();


Expand All @@ -152,10 +150,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter() .iter()
.zip(input_fields) .zip(input_fields)
.map(|(field, input_field)| { .map(|(field, input_field)| {
cast( cast(col(input_field.name()), field.data_type().clone())
col(input_field.name()),
field.data_type().clone(),
)
.alias(field.name()) .alias(field.name())
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Expand All @@ -168,13 +163,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {


Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
avantgardnerio marked this conversation as resolved.
Show resolved Hide resolved
name: self.object_name_to_table_reference(name)?, name: self.object_name_to_table_reference(name)?,
primary_key,
input: Arc::new(plan), input: Arc::new(plan),
if_not_exists, if_not_exists,
or_replace, or_replace,
})) }))
} }


None => { None => {
let primary_key = Self::primary_key_from_constraints(&constraints)?;

let schema = self.build_schema(columns)?.to_dfschema_ref()?; let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let plan = EmptyRelation { let plan = EmptyRelation {
produce_one_row: false, produce_one_row: false,
Expand All @@ -184,13 +182,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {


Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name: self.object_name_to_table_reference(name)?, name: self.object_name_to_table_reference(name)?,
primary_key,
input: Arc::new(plan), input: Arc::new(plan),
if_not_exists, if_not_exists,
or_replace, or_replace,
})) }))
} }
} },
}


Statement::CreateView { Statement::CreateView {
or_replace, or_replace,
Expand Down Expand Up @@ -1076,4 +1074,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.get_table_provider(tables_reference) .get_table_provider(tables_reference)
.is_ok() .is_ok()
} }

fn primary_key_from_constraints(
constraints: &[TableConstraint],
) -> Result<Vec<Column>> {
let pk: Result<Vec<&Vec<Ident>>> = constraints
.iter()
.map(|c: &TableConstraint| match c {
TableConstraint::Unique {
columns,
is_primary,
..
} => match is_primary {
true => Ok(columns),
false => Err(DataFusionError::Plan(
"Non-primary unique constraints are not supported".to_string(),
)),
},
TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
"Foreign key constraints are not currently supported".to_string(),
)),
TableConstraint::Check { .. } => Err(DataFusionError::Plan(
"Check constraints are not currently supported".to_string(),
)),
TableConstraint::Index { .. } => Err(DataFusionError::Plan(
"Indexes are not currently supported".to_string(),
)),
TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
"Indexes are not currently supported".to_string(),
)),
})
.collect();
let pk = pk?;
let pk = match pk.as_slice() {
[] => return Ok(vec![]),
[pk] => pk,
_ => {
return Err(DataFusionError::Plan(
"Only one primary key is supported!".to_string(),
))?
}
};
let primary_key: Vec<Column> = pk
.iter()
.map(|c| Column {
relation: None,
name: c.value.clone(),
})
.collect();
Ok(primary_key)
}
} }
30 changes: 30 additions & 0 deletions datafusion/sql/tests/integration_test.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -199,6 +199,36 @@ fn cast_to_invalid_decimal_type() {
} }
} }


#[test]
fn plan_create_table_with_pk() {
let sql = "create table person (id int, name string, primary key(id))";
let plan = r#"
CreateMemoryTable: Bare { table: "person" } primary_key=[id]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should show PK

EmptyRelation
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_create_table_no_pk() {
let sql = "create table person (id int, name string)";
let plan = r#"
CreateMemoryTable: Bare { table: "person" }
avantgardnerio marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not change existing behavior if no PK

EmptyRelation
"#
.trim();
quick_test(sql, plan);
}

#[test]
#[should_panic(expected = "Non-primary unique constraints are not supported")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhanded syntax causes errors.

fn plan_create_table_check_constraint() {
let sql = "create table person (id int, name string, unique(id))";
let plan = "";
quick_test(sql, plan);
}

#[test] #[test]
fn plan_start_transaction() { fn plan_start_transaction() {
let sql = "start transaction"; let sql = "start transaction";
Expand Down