Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support primary key alternate syntax #7160

Merged
merged 5 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 79 additions & 62 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,75 +136,92 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
or_replace,
..
} if table_properties.is_empty() && with_options.is_empty() => match query {
Some(query) => {
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();

let plan = if !columns.is_empty() {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
if schema.fields().len() != input_schema.fields().len() {
return plan_err!(
} if table_properties.is_empty() && with_options.is_empty() => {
let mut constraints = constraints;
for column in &columns {
for option in &column.options {
if let ast::ColumnOption::Unique { is_primary } = option.option {
constraints.push(ast::TableConstraint::Unique {
name: None,
columns: vec![column.name.clone()],
is_primary,
})
}
}
}
match query {
Some(query) => {
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();

let plan = if !columns.is_empty() {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
if schema.fields().len() != input_schema.fields().len() {
return plan_err!(
"Mismatch: {} columns specified, but result has {} columns",
schema.fields().len(),
input_schema.fields().len()
);
}
let input_fields = input_schema.fields();
let project_exprs = schema
.fields()
.iter()
.zip(input_fields)
.map(|(field, input_field)| {
cast(col(input_field.name()), field.data_type().clone())
}
let input_fields = input_schema.fields();
let project_exprs = schema
.fields()
.iter()
.zip(input_fields)
.map(|(field, input_field)| {
cast(
col(input_field.name()),
field.data_type().clone(),
)
.alias(field.name())
})
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan.clone())
.project(project_exprs)?
.build()?
} else {
plan
};

let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;

Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}
})
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan.clone())
.project(project_exprs)?
.build()?
} else {
plan
};

let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;

Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}

None => {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
None => {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}
}
},
}

Statement::CreateView {
or_replace,
Expand Down
43 changes: 43 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,49 @@ CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0])]
"#
.trim();
quick_test(sql, plan);

let sql = "create table person (id int primary key, name string)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding unique constraints as well

create table person (id int unique, name string)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a test case about unique constraints.

let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);

let sql =
"create table person (id int, name string unique not null, primary key(id))";
let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0]), Unique([1])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);

let sql = "create table person (id int, name varchar, primary key(name, id));";
let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([1, 0])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_create_table_with_multi_pk() {
let sql = "create table person (id int, name string primary key, primary key(id))";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this means a multi part primary key -- postgres errors on this syntax

postgres=# create table person (id int, name varchar primary key, primary key(id));
ERROR:  multiple primary keys for table "person" are not allowed
LINE 1: ...e table person (id int, name varchar primary key, primary ke...

I think the way this is supposed to be expressed is like

create table person (id int, name varchar,  primary key(name,  id));

Copy link
Contributor

@mustafasrepo mustafasrepo Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this means a multi part primary key -- postgres errors on this syntax

postgres=# create table person (id int, name varchar primary key, primary key(id));
ERROR:  multiple primary keys for table "person" are not allowed
LINE 1: ...e table person (id int, name varchar primary key, primary ke...

I think the way this is supposed to be expressed is like

create table person (id int, name varchar,  primary key(name,  id));

I guess it is better to follow postgres in this case. Postgres supports multiple unique constraints. Hence following query

create table person (id int, name string primary key, primary key(id))

can be written as

create table person (id int, name string unique not null, primary key(id))

to define unique constraint. For writing composite primary key one have to use following syntax

create table person (id int, name varchar,  primary key(name,  id));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parkma99 is this comment still outstanding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am at home this week.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no worries and no pressure! I was just trying to understand what the state of this PR was -- it looks like you are aware of the feedback and just haven't had a chance to implement it. I wanted to make sure you weren't waiting on additional feedback

I'll mark it as draft so it doesn't appear on the list of PRs needing review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think two primary keys on the same column doesn't really make sense, but we can always make this check more strict as a follow on PR I think

let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0]), PrimaryKey([1])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_create_table_with_unique() {
let sql = "create table person (id int unique, name string)";
let plan = "CreateMemoryTable: Bare { table: \"person\" } constraints=[Unique([0])]\n EmptyRelation";
quick_test(sql, plan);
}

#[test]
Expand Down
27 changes: 27 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3021,6 +3021,21 @@ CREATE TABLE sales_global_with_pk (zip_code INT,
(1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
(1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)

# create a table for testing, with primary key alternate syntax
statement ok
CREATE TABLE sales_global_with_pk_alternate (zip_code INT,
country VARCHAR(3),
sn INT primary key,
ts TIMESTAMP,
currency VARCHAR(3),
amount FLOAT
) as VALUES
(0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
(1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
(1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
(1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
(1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)

# create a table for testing, where primary key is composite
statement ok
CREATE TABLE sales_global_with_composite_pk (zip_code INT,
Expand Down Expand Up @@ -3091,6 +3106,18 @@ SELECT s.sn, s.amount, 2*s.sn
3 200 6
4 100 8

query IRI
SELECT s.sn, s.amount, 2*s.sn
FROM sales_global_with_pk_alternate AS s
GROUP BY sn
ORDER BY sn
----
0 30 0
1 50 2
2 75 4
3 200 6
4 100 8

# Join should propagate primary key successfully
query TT
EXPLAIN SELECT r.sn, SUM(l.amount), r.amount
Expand Down