Skip to content

Commit

Permalink
feat: support primary key alternate syntax (#7160)
Browse files Browse the repository at this point in the history
* support primary key aother syntax

* add test case

* add unique test case

* add tests and fix fmt
  • Loading branch information
parkma99 authored Aug 23, 2023
1 parent 304cb02 commit 727e8c6
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 62 deletions.
141 changes: 79 additions & 62 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,75 +154,92 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
or_replace,
..
} if table_properties.is_empty() && with_options.is_empty() => match query {
Some(query) => {
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();

let plan = if !columns.is_empty() {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
if schema.fields().len() != input_schema.fields().len() {
return plan_err!(
} if table_properties.is_empty() && with_options.is_empty() => {
let mut constraints = constraints;
for column in &columns {
for option in &column.options {
if let ast::ColumnOption::Unique { is_primary } = option.option {
constraints.push(ast::TableConstraint::Unique {
name: None,
columns: vec![column.name.clone()],
is_primary,
})
}
}
}
match query {
Some(query) => {
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();

let plan = if !columns.is_empty() {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
if schema.fields().len() != input_schema.fields().len() {
return plan_err!(
"Mismatch: {} columns specified, but result has {} columns",
schema.fields().len(),
input_schema.fields().len()
);
}
let input_fields = input_schema.fields();
let project_exprs = schema
.fields()
.iter()
.zip(input_fields)
.map(|(field, input_field)| {
cast(col(input_field.name()), field.data_type().clone())
}
let input_fields = input_schema.fields();
let project_exprs = schema
.fields()
.iter()
.zip(input_fields)
.map(|(field, input_field)| {
cast(
col(input_field.name()),
field.data_type().clone(),
)
.alias(field.name())
})
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan.clone())
.project(project_exprs)?
.build()?
} else {
plan
};

let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;

Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}
})
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan.clone())
.project(project_exprs)?
.build()?
} else {
plan
};

let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;

Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}

None => {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
None => {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}
}
},
}

Statement::CreateView {
or_replace,
Expand Down
43 changes: 43 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,49 @@ CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0])]
"#
.trim();
quick_test(sql, plan);

let sql = "create table person (id int primary key, name string)";
let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);

let sql =
"create table person (id int, name string unique not null, primary key(id))";
let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0]), Unique([1])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);

let sql = "create table person (id int, name varchar, primary key(name, id));";
let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([1, 0])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_create_table_with_multi_pk() {
let sql = "create table person (id int, name string primary key, primary key(id))";
let plan = r#"
CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0]), PrimaryKey([1])]
EmptyRelation
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_create_table_with_unique() {
let sql = "create table person (id int unique, name string)";
let plan = "CreateMemoryTable: Bare { table: \"person\" } constraints=[Unique([0])]\n EmptyRelation";
quick_test(sql, plan);
}

#[test]
Expand Down
27 changes: 27 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3021,6 +3021,21 @@ CREATE TABLE sales_global_with_pk (zip_code INT,
(1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
(1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)

# create a table for testing, with primary key alternate syntax
statement ok
CREATE TABLE sales_global_with_pk_alternate (zip_code INT,
country VARCHAR(3),
sn INT primary key,
ts TIMESTAMP,
currency VARCHAR(3),
amount FLOAT
) as VALUES
(0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
(1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
(1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
(1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
(1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)

# create a table for testing, where primary key is composite
statement ok
CREATE TABLE sales_global_with_composite_pk (zip_code INT,
Expand Down Expand Up @@ -3091,6 +3106,18 @@ SELECT s.sn, s.amount, 2*s.sn
3 200 6
4 100 8

query IRI
SELECT s.sn, s.amount, 2*s.sn
FROM sales_global_with_pk_alternate AS s
GROUP BY sn
ORDER BY sn
----
0 30 0
1 50 2
2 75 4
3 200 6
4 100 8

# Join should propagate primary key successfully
query TT
EXPLAIN SELECT r.sn, SUM(l.amount), r.amount
Expand Down

0 comments on commit 727e8c6

Please sign in to comment.