Skip to content

Commit

Permalink
Implement SQL parsing for CREATE TABLE .. FROM SOURCE
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Jul 10, 2024
1 parent d487d2b commit 748330f
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 74 deletions.
49 changes: 49 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,25 @@ impl AstDisplay for KeyConstraint {
}
impl_display!(KeyConstraint);

/// The source and external reference for a table-fed source, specified in a
/// `CREATE TABLE .. FROM SOURCE .. (REFERENCE ..)` statement.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableFromSource<T: AstInfo> {
pub source: T::ItemName,
pub external_reference: UnresolvedItemName,
}

impl<T: AstInfo> AstDisplay for TableFromSource<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(" FROM SOURCE ");
f.write_node(&self.source);
f.write_str(" (REFERENCE = ");
f.write_node(&self.external_reference);
f.write_str(")");
}
}
impl_display_t!(TableFromSource);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CreateSourceOptionName {
IgnoreKeys,
Expand Down Expand Up @@ -1465,6 +1484,36 @@ pub struct CreateSourceOption<T: AstInfo> {
impl_display_for_with_option!(CreateSourceOption);
impl_display_t!(CreateSourceOption);

/// A specification for a column, either just the name or the full definition.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ColumnSpec<T: AstInfo> {
ColumnDef(ColumnDef<T>),
ColumnName(Ident),
}

impl<T: AstInfo> ColumnSpec<T> {
pub fn is_column_name(&self) -> bool {
matches!(self, ColumnSpec::ColumnName(_))
}

pub fn name(&self) -> &Ident {
match self {
ColumnSpec::ColumnDef(def) => &def.name,
ColumnSpec::ColumnName(name) => name,
}
}
}

impl<T: AstInfo> AstDisplay for ColumnSpec<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
ColumnSpec::ColumnDef(def) => f.write_node(def),
ColumnSpec::ColumnName(name) => f.write_node(name),
}
}
}
impl_display_t!(ColumnSpec);

/// SQL column definition
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnDef<T: AstInfo> {
Expand Down
36 changes: 25 additions & 11 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ use smallvec::{smallvec, SmallVec};

use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
use crate::ast::{
AstInfo, ColumnDef, ConnectionOption, ConnectionOptionName, CreateConnectionOption,
AstInfo, ColumnDef, ColumnSpec, ConnectionOption, ConnectionOptionName, CreateConnectionOption,
CreateConnectionType, CreateSinkConnection, CreateSourceConnection, CreateSourceFormat,
CreateSourceOption, CreateSourceOptionName, DeferredItemName, Expr, Format, Ident,
IntervalValue, KeyConstraint, MaterializedViewOption, Query, SelectItem, SinkEnvelope,
SourceEnvelope, SourceIncludeMetadata, SubscribeOutput, TableAlias, TableConstraint,
TableWithJoins, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
UnresolvedSchemaName, Value,
TableFromSource, TableWithJoins, UnresolvedDatabaseName, UnresolvedItemName,
UnresolvedObjectName, UnresolvedSchemaName, Value,
};

/// A top-level statement (SELECT, INSERT, CREATE, etc.)
Expand Down Expand Up @@ -1406,11 +1406,14 @@ pub struct CreateTableStatement<T: AstInfo> {
/// Table name
pub name: UnresolvedItemName,
/// Optional schema
pub columns: Vec<ColumnDef<T>>,
pub constraints: Vec<TableConstraint<T>>,
pub columns: Option<Vec<ColumnSpec<T>>>,
pub constraints: Option<Vec<TableConstraint<T>>>,
pub if_not_exists: bool,
pub temporary: bool,
pub with_options: Vec<TableOption<T>>,
/// Optionally specify that this is a read-only table
/// fed by an upstream source.
pub from_source: Option<TableFromSource<T>>,
}

impl<T: AstInfo> AstDisplay for CreateTableStatement<T> {
Expand All @@ -1422,6 +1425,7 @@ impl<T: AstInfo> AstDisplay for CreateTableStatement<T> {
if_not_exists,
temporary,
with_options,
from_source,
} = self;
f.write_str("CREATE ");
if *temporary {
Expand All @@ -1432,13 +1436,23 @@ impl<T: AstInfo> AstDisplay for CreateTableStatement<T> {
f.write_str("IF NOT EXISTS ");
}
f.write_node(name);
f.write_str(" (");
f.write_node(&display::comma_separated(columns));
if !self.constraints.is_empty() {
f.write_str(", ");
f.write_node(&display::comma_separated(constraints));
if columns.is_some() || constraints.is_some() {
f.write_str(" (");
if let Some(columns) = columns {
f.write_node(&display::comma_separated(columns));
}

if let Some(constraints) = constraints {
if !constraints.is_empty() {
f.write_str(", ");
f.write_node(&display::comma_separated(constraints));
}
}
f.write_str(")");
}
if let Some(from_source) = from_source {
f.write_node(from_source);
}
f.write_str(")");
if !with_options.is_empty() {
f.write_str(" WITH (");
f.write_node(&display::comma_separated(&self.with_options));
Expand Down
128 changes: 101 additions & 27 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2649,7 +2649,19 @@ impl<'a> Parser<'a> {
let if_not_exists = self.parse_if_not_exists()?;
let name = self.parse_item_name()?;

let (columns, constraints) = self.parse_columns(Mandatory)?;
let (columns, constraints) = self
.parse_columns(Mandatory)?
.expect("columns are mandatory");
let columns = columns
.into_iter()
.map(|c| match c {
ColumnSpec::ColumnName(_) => Err(ParserError::new(
self.peek_prev_pos(),
"Columns must be fully specified in CREATE SUBSOURCE",
)),
ColumnSpec::ColumnDef(c) => Ok(c),
})
.collect::<Result<Vec<_>, ParserError>>()?;

let of_source = if self.parse_keyword(OF) {
self.expect_keyword(SOURCE)?;
Expand Down Expand Up @@ -4207,8 +4219,61 @@ impl<'a> Parser<'a> {
self.expect_keyword(TABLE)?;
let if_not_exists = self.parse_if_not_exists()?;
let table_name = self.parse_item_name()?;

// parse optional column list (schema)
let (columns, constraints) = self.parse_columns(Mandatory)?;
let (columns, constraints) = match self.parse_columns(Optional)? {
Some((c, c1)) => (Some(c), Some(c1)),
None => (None, None),
};
let column_pos = self.peek_prev_pos();
let column_next_token = self.peek_token();

// If this is a read-only table from a source, parse the source and external reference
// and ensure that any columns specified do not include full definitions
let from_source = if self.parse_keywords(&[FROM, SOURCE]) {
let source = self.parse_raw_name()?;
self.expect_token(&Token::LParen)?;
self.expect_keyword(REFERENCE)?;
let _ = self.consume_token(&Token::Eq);
let external_reference = self.parse_item_name()?;
self.expect_token(&Token::RParen)?;

if columns
.as_ref()
.map_or(false, |cols| cols.iter().any(|c| !c.is_column_name()))
{
return Err(ParserError::new(
column_pos,
"full column definitions are not allowed in CREATE TABLE ... FROM SOURCE",
));
}

Some(TableFromSource {
source,
external_reference,
})
} else {
// If there is no from_source, we should assert that columns were specified (not None)
// and include full definitions
match &columns {
None => {
return self.expected(
column_pos,
"a list of columns in parentheses",
column_next_token,
)
}
Some(cols) if cols.iter().any(|c| c.is_column_name()) => {
return self.expected(
column_pos,
"a list of columns with types in parentheses",
column_next_token,
)
}
Some(_) => {}
};
None
};

let with_options = if self.parse_keyword(WITH) {
self.expect_token(&Token::LParen)?;
Expand All @@ -4230,19 +4295,17 @@ impl<'a> Parser<'a> {
if_not_exists,
temporary,
with_options,
from_source,
}))
}

fn parse_columns(
&mut self,
optional: IsOptional,
) -> Result<(Vec<ColumnDef<Raw>>, Vec<TableConstraint<Raw>>), ParserError> {
let mut columns = vec![];
let mut constraints = vec![];

) -> Result<Option<(Vec<ColumnSpec<Raw>>, Vec<TableConstraint<Raw>>)>, ParserError> {
if !self.consume_token(&Token::LParen) {
if optional == Optional {
return Ok((columns, constraints));
return Ok(None);
} else {
return self.expected(
self.peek_pos(),
Expand All @@ -4251,35 +4314,46 @@ impl<'a> Parser<'a> {
);
}
}
let mut columns = vec![];
let mut constraints = vec![];

if self.consume_token(&Token::RParen) {
// Tables with zero columns are a PostgreSQL extension.
return Ok((columns, constraints));
return Ok(Some((columns, constraints)));
}

loop {
if let Some(constraint) = self.parse_optional_table_constraint()? {
constraints.push(constraint);
} else if let Some(column_name) = self.consume_identifier()? {
let data_type = self.parse_data_type()?;
let collation = if self.parse_keyword(COLLATE) {
Some(self.parse_item_name()?)
} else {
None
};
let mut options = vec![];
loop {
match self.peek_token() {
None | Some(Token::Comma) | Some(Token::RParen) => break,
_ => options.push(self.parse_column_option_def()?),
// Check if this is just a column name or a full column definition.
let col = match self.peek_token() {
Some(Token::Comma) | Some(Token::RParen) => ColumnSpec::ColumnName(column_name),
_ => {
let data_type = self.parse_data_type()?;
let collation = if self.parse_keyword(COLLATE) {
Some(self.parse_item_name()?)
} else {
None
};
let mut options = vec![];
loop {
match self.peek_token() {
None | Some(Token::Comma) | Some(Token::RParen) => break,
_ => options.push(self.parse_column_option_def()?),
}
}

ColumnSpec::ColumnDef(ColumnDef {
name: column_name,
data_type,
collation,
options,
})
}
}
};

columns.push(ColumnDef {
name: column_name,
data_type,
collation,
options,
});
columns.push(col);
} else {
return self.expected(
self.peek_pos(),
Expand All @@ -4300,7 +4374,7 @@ impl<'a> Parser<'a> {
}
}

Ok((columns, constraints))
Ok(Some((columns, constraints)))
}

fn parse_column_option_def(&mut self) -> Result<ColumnOptionDef<Raw>, ParserError> {
Expand Down
10 changes: 5 additions & 5 deletions src/sql-parser/tests/testdata/create
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,21 @@ CREATE TABLE "table_name" (col_name int)
----
CREATE TABLE table_name (col_name int4)
=>
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("table_name")]), columns: [ColumnDef { name: Ident("col_name"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, temporary: false, with_options: [] })
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("table_name")]), columns: Some([ColumnDef(ColumnDef { name: Ident("col_name"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] })]), constraints: Some([]), if_not_exists: false, temporary: false, with_options: [], from_source: None })

parse-statement
CREATE TABLE schema_name.table_name (col_name int)
----
CREATE TABLE schema_name.table_name (col_name int4)
=>
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("schema_name"), Ident("table_name")]), columns: [ColumnDef { name: Ident("col_name"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, temporary: false, with_options: [] })
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("schema_name"), Ident("table_name")]), columns: Some([ColumnDef(ColumnDef { name: Ident("col_name"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] })]), constraints: Some([]), if_not_exists: false, temporary: false, with_options: [], from_source: None })

parse-statement
CREATE TABLE schema_name.table_name (col_name text COLLATE en)
----
CREATE TABLE schema_name.table_name (col_name text COLLATE en)
=>
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("schema_name"), Ident("table_name")]), columns: [ColumnDef { name: Ident("col_name"), data_type: Other { name: Name(UnresolvedItemName([Ident("text")])), typ_mod: [] }, collation: Some(UnresolvedItemName([Ident("en")])), options: [] }], constraints: [], if_not_exists: false, temporary: false, with_options: [] })
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("schema_name"), Ident("table_name")]), columns: Some([ColumnDef(ColumnDef { name: Ident("col_name"), data_type: Other { name: Name(UnresolvedItemName([Ident("text")])), typ_mod: [] }, collation: Some(UnresolvedItemName([Ident("en")])), options: [] })]), constraints: Some([]), if_not_exists: false, temporary: false, with_options: [], from_source: None })

parse-statement
CREATE TABLE "" (col_name int)
Expand Down Expand Up @@ -239,14 +239,14 @@ CREATE TABLE row (row int)
----
CREATE TABLE row (row int4)
=>
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("row")]), columns: [ColumnDef { name: Ident("row"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, temporary: false, with_options: [] })
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("row")]), columns: Some([ColumnDef(ColumnDef { name: Ident("row"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] })]), constraints: Some([]), if_not_exists: false, temporary: false, with_options: [], from_source: None })

parse-statement
CREATE TABLE t (x int) WITH (RETAIN HISTORY = FOR '1 day')
----
CREATE TABLE t (x int4) WITH (RETAIN HISTORY = FOR '1 day')
=>
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("t")]), columns: [ColumnDef { name: Ident("x"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, temporary: false, with_options: [TableOption { name: RetainHistory, value: Some(RetainHistoryFor(String("1 day"))) }] })
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("t")]), columns: Some([ColumnDef(ColumnDef { name: Ident("x"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] })]), constraints: Some([]), if_not_exists: false, temporary: false, with_options: [TableOption { name: RetainHistory, value: Some(RetainHistoryFor(String("1 day"))) }], from_source: None })

parse-statement
CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS
Expand Down
Loading

0 comments on commit 748330f

Please sign in to comment.