Skip to content

Commit

Permalink
Merge pull request #28125 from rjobanp/create-table-from-source
Browse files Browse the repository at this point in the history
Implement SQL parsing for CREATE TABLE .. FROM SOURCE
  • Loading branch information
rjobanp authored Jul 16, 2024
2 parents dd6aeae + b8fc853 commit 9c5a747
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ impl Coordinator {
| Statement::CreateSource(_)
| Statement::CreateSubsource(_)
| Statement::CreateTable(_)
| Statement::CreateTableFromSource(_)
| Statement::CreateType(_)
| Statement::CreateView(_)
| Statement::CreateWebhookSource(_)
Expand Down
44 changes: 44 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum Statement<T: AstInfo> {
CreateView(CreateViewStatement<T>),
CreateMaterializedView(CreateMaterializedViewStatement<T>),
CreateTable(CreateTableStatement<T>),
CreateTableFromSource(CreateTableFromSourceStatement<T>),
CreateIndex(CreateIndexStatement<T>),
CreateType(CreateTypeStatement<T>),
CreateRole(CreateRoleStatement),
Expand Down Expand Up @@ -127,6 +128,7 @@ impl<T: AstInfo> AstDisplay for Statement<T> {
Statement::CreateView(stmt) => f.write_node(stmt),
Statement::CreateMaterializedView(stmt) => f.write_node(stmt),
Statement::CreateTable(stmt) => f.write_node(stmt),
Statement::CreateTableFromSource(stmt) => f.write_node(stmt),
Statement::CreateIndex(stmt) => f.write_node(stmt),
Statement::CreateRole(stmt) => f.write_node(stmt),
Statement::CreateSecret(stmt) => f.write_node(stmt),
Expand Down Expand Up @@ -202,6 +204,7 @@ pub fn statement_kind_label_value(kind: StatementKind) -> &'static str {
StatementKind::CreateView => "create_view",
StatementKind::CreateMaterializedView => "create_materialized_view",
StatementKind::CreateTable => "create_table",
StatementKind::CreateTableFromSource => "create_table_from_source",
StatementKind::CreateIndex => "create_index",
StatementKind::CreateType => "create_type",
StatementKind::CreateRole => "create_role",
Expand Down Expand Up @@ -1488,6 +1491,47 @@ pub struct TableOption<T: AstInfo> {
}
impl_display_for_with_option!(TableOption);

/// `CREATE TABLE .. FROM SOURCE`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateTableFromSourceStatement<T: AstInfo> {
/// Table name
pub name: UnresolvedItemName,
/// Optional set of columns to include
pub columns: Vec<Ident>,
pub if_not_exists: bool,
pub source: T::ItemName,
pub external_reference: UnresolvedItemName,
}

impl<T: AstInfo> AstDisplay for CreateTableFromSourceStatement<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
let Self {
name,
columns,
source,
external_reference,
if_not_exists,
} = self;
f.write_str("CREATE TABLE ");
if *if_not_exists {
f.write_str("IF NOT EXISTS ");
}
f.write_node(name);
if !columns.is_empty() {
f.write_str(" (");

f.write_node(&display::comma_separated(columns));
f.write_str(")");
}
f.write_str(" FROM SOURCE ");
f.write_node(source);
f.write_str(" (REFERENCE = ");
f.write_node(external_reference);
f.write_str(")");
}
}
impl_display_t!(CreateTableFromSourceStatement);

/// `CREATE INDEX`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateIndexStatement<T: AstInfo> {
Expand Down
61 changes: 58 additions & 3 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1553,8 +1553,12 @@ impl<'a> Parser<'a> {
}

fn peek_keywords(&mut self, keywords: &[Keyword]) -> bool {
self.peek_keywords_from(0, keywords)
}

fn peek_keywords_from(&mut self, start: usize, keywords: &[Keyword]) -> bool {
for (i, keyword) in keywords.iter().enumerate() {
match self.peek_nth_token(i) {
match self.peek_nth_token(start + i) {
Some(Token::Keyword(k)) => {
if k != *keyword {
return false;
Expand All @@ -1573,6 +1577,19 @@ impl<'a> Parser<'a> {
}
}

/// Returns whether the sequence of keywords is found at any point before
/// the end of the unprocessed tokens.
fn peek_keywords_lookahead(&mut self, keywords: &[Keyword]) -> bool {
let mut index = 0;
while index < self.tokens.len() {
if self.peek_keywords_from(index, keywords) {
return true;
}
index += 1;
}
false
}

/// Return the nth token that has not yet been processed.
fn peek_nth_token(&self, n: usize) -> Option<Token> {
self.tokens
Expand Down Expand Up @@ -1854,8 +1871,13 @@ impl<'a> Parser<'a> {
|| self.peek_keywords(&[TEMP, TABLE])
|| self.peek_keywords(&[TEMPORARY, TABLE])
{
self.parse_create_table()
.map_parser_err(StatementKind::CreateTable)
if self.peek_keywords_lookahead(&[FROM, SOURCE]) {
self.parse_create_table_from_source()
.map_parser_err(StatementKind::CreateTableFromSource)
} else {
self.parse_create_table()
.map_parser_err(StatementKind::CreateTable)
}
} else if self.peek_keyword(SECRET) {
self.parse_create_secret()
.map_parser_err(StatementKind::CreateSecret)
Expand Down Expand Up @@ -4261,6 +4283,39 @@ impl<'a> Parser<'a> {
}))
}

fn parse_create_table_from_source(&mut self) -> Result<Statement<Raw>, ParserError> {
self.expect_keyword(TABLE)?;
let if_not_exists = self.parse_if_not_exists()?;
let table_name = self.parse_item_name()?;
// parse optional column name list
let columns = if self.consume_token(&Token::LParen) {
let cols = self.parse_comma_separated(Parser::parse_identifier)?;
self.expect_token(&Token::RParen)?;
cols
} else {
vec![]
};

self.expect_keywords(&[FROM, SOURCE])?;

let source = self.parse_raw_name()?;
self.expect_token(&Token::LParen)?;
self.expect_keyword(REFERENCE)?;
let _ = self.consume_token(&Token::Eq);
let external_reference = self.parse_item_name()?;
self.expect_token(&Token::RParen)?;

Ok(Statement::CreateTableFromSource(
CreateTableFromSourceStatement {
name: table_name,
columns,
if_not_exists,
source,
external_reference,
},
))
}

fn parse_columns(
&mut self,
optional: IsOptional,
Expand Down
28 changes: 28 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,34 @@ CREATE TABLE t (c type(1) list list)
=>
CreateTable(CreateTableStatement { name: UnresolvedItemName([Ident("t")]), columns: [ColumnDef { name: Ident("c"), data_type: List(List(Other { name: Name(UnresolvedItemName([Ident("type")])), typ_mod: [1] })), collation: None, options: [] }], constraints: [], if_not_exists: false, temporary: false, with_options: [] })

parse-statement
CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE bar)
----
error: Expected right parenthesis, found identifier "int4"
CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE bar)
^

parse-statement
CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE bar)
----
CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE = bar)
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [Ident("c"), Ident("d")], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: UnresolvedItemName([Ident("bar")]) })

parse-statement
CREATE TABLE t FROM SOURCE foo
----
error: Expected left parenthesis, found EOF
CREATE TABLE t FROM SOURCE foo
^

parse-statement
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz)
----
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz)
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: UnresolvedItemName([Ident("baz")]) })

parse-statement
CREATE DATABASE IF EXISTS foo
----
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ impl Plan {
| StatementKind::CreateSubsource
| StatementKind::CreateWebhookSource => &[PlanKind::CreateSource],
StatementKind::CreateTable => &[PlanKind::CreateTable],
StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
StatementKind::CreateType => &[PlanKind::CreateType],
StatementKind::CreateView => &[PlanKind::CreateView],
StatementKind::Deallocate => &[PlanKind::Deallocate],
Expand Down
4 changes: 4 additions & 0 deletions src/sql/src/plan/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub fn describe(
Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
Statement::CreateSubsource(stmt) => ddl::describe_create_subsource(&scx, stmt)?,
Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
Statement::CreateTableFromSource(stmt) => {
ddl::describe_create_table_from_source(&scx, stmt)?
}
Statement::CreateType(stmt) => ddl::describe_create_type(&scx, stmt)?,
Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
Statement::CreateMaterializedView(stmt) => {
Expand Down Expand Up @@ -334,6 +337,7 @@ pub fn plan(
Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
Statement::CreateSubsource(stmt) => ddl::plan_create_subsource(scx, stmt),
Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
Statement::CreateTableFromSource(stmt) => ddl::plan_create_table_from_source(scx, stmt),
Statement::CreateType(stmt) => ddl::plan_create_type(scx, stmt),
Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt, params),
Statement::CreateMaterializedView(stmt) => {
Expand Down
43 changes: 29 additions & 14 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,21 @@ use mz_sql_parser::ast::{
CreateRoleStatement, CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection,
CreateSinkOption, CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection,
CreateSourceOption, CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableStatement, CreateTypeAs,
CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
MySqlConfigOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica,
RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition,
ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy,
SourceIncludeMetadata, Statement, TableConstraint, TableOption, TableOptionName,
UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value,
ViewDefinition, WithOptionValue,
CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
FormatSpecifier, Ident, IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption,
KeyConstraint, LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption,
MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, PgConfigOption,
PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
Statement, TableConstraint, TableOption, TableOptionName, UnresolvedDatabaseName,
UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
WithOptionValue,
};
use mz_sql_parser::ident;
use mz_sql_parser::parser::StatementParseResult;
Expand Down Expand Up @@ -404,6 +405,20 @@ pub fn plan_create_table(
}))
}

pub fn describe_create_table_from_source(
_: &StatementContext,
_: CreateTableFromSourceStatement<Aug>,
) -> Result<StatementDesc, PlanError> {
Ok(StatementDesc::new(None))
}

pub fn plan_create_table_from_source(
_scx: &StatementContext,
_stmt: CreateTableFromSourceStatement<Aug>,
) -> Result<Plan, PlanError> {
bail_unsupported!("CREATE TABLE .. FROM SOURCE")
}

pub fn describe_create_webhook_source(
_: &StatementContext,
_: CreateWebhookSourceStatement<Aug>,
Expand Down

0 comments on commit 9c5a747

Please sign in to comment.