Skip to content

Commit

Permalink
feat: Implement SHOW CREATE FLOW (#4040)
Browse files Browse the repository at this point in the history
* feat: Implement SHOW CREATE FLOW

* fmt

* stmt for display

* Update src/operator/src/statement.rs

Co-authored-by: Yingwen <[email protected]>

* test: add sqlness test

* fix test

* parse query in parser

* test: move test to standalone

* reuse ParserContext::new()

* Update tests/cases/standalone/show_create_flow.result

Co-authored-by: Weny Xu <[email protected]>

* add line breaks

---------

Co-authored-by: Yingwen <[email protected]>
Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
3 people authored Jun 7, 2024
1 parent 4719569 commit 9c42825
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod flow_info;
pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
Expand Down
20 changes: 20 additions & 0 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ impl FlowInfoValue {
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
}

pub fn flow_name(&self) -> &String {
&self.flow_name
}

pub fn sink_table_name(&self) -> &TableName {
&self.sink_table_name
}

pub fn raw_sql(&self) -> &String {
&self.raw_sql
}

pub fn expire_after(&self) -> Option<i64> {
self.expire_after
}

pub fn comment(&self) -> &String {
&self.comment
}
}

pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ pub fn check_permission(
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
Statement::ShowCreateFlow(stmt) => {
validate_param(&stmt.flow_name, query_ctx)?;
}
Statement::CreateExternalTable(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Expand Down
38 changes: 38 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,44 @@ impl StatementExecutor {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
Statement::ShowCreateFlow(show) => {
let obj_name = &show.flow_name;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
[catalog, table] => (catalog.value.clone(), table.value.clone()),
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
),
}
.fail()
}
};

let flow_name_val = self
.flow_metadata_manager
.flow_name_manager()
.get(&catalog_name, &flow_name)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;

let flow_val = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_name_val.flow_id())
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;

self.show_create_flow(obj_name.clone(), flow_val, query_ctx)
.await
}
Statement::SetVariables(set_var) => {
let var_name = set_var.variable.to_string().to_uppercase();
match var_name.as_str() {
Expand Down
13 changes: 13 additions & 0 deletions src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
Expand All @@ -23,6 +24,7 @@ use sql::statements::create::Partitions;
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use sqlparser::ast::ObjectName;
use table::metadata::TableType;
use table::table_name::TableName;
use table::TableRef;
Expand Down Expand Up @@ -105,6 +107,17 @@ impl StatementExecutor {
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn show_create_flow(
&self,
flow_name: ObjectName,
flow_val: FlowInfoValue,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_create_flow(flow_name, flow_val, query_ctx)
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
Expand Down
53 changes: 52 additions & 1 deletion src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_datasource::file_format::{infer_schemas, FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
Expand All @@ -49,10 +50,13 @@ use regex::Regex;
use session::context::QueryContextRef;
pub use show_create_table::create_table_stmt;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::create::{CreateFlow, Partitions};
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use sqlparser::ast::ObjectName;
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
use table::TableRef;

Expand Down Expand Up @@ -134,6 +138,13 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
]))
});

static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
]))
});

fn null() -> Expr {
lit(ScalarValue::Null)
}
Expand Down Expand Up @@ -606,6 +617,46 @@ pub fn show_create_table(
Ok(Output::new_with_record_batches(records))
}

pub fn show_create_flow(
flow_name: ObjectName,
flow_val: FlowInfoValue,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut parser_ctx =
ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;

let query = parser_ctx.parser_query().context(error::SqlSnafu)?;

let comment = if flow_val.comment().is_empty() {
None
} else {
Some(flow_val.comment().clone())
};

let stmt = CreateFlow {
flow_name,
sink_table_name: ObjectName(vec![Ident {
value: flow_val.sink_table_name().table_name.clone(),
quote_style: None,
}]),
or_replace: true,
if_not_exists: true,
expire_after: flow_val.expire_after(),
comment,
query,
};

let sql = format!("{}", stmt);
let columns = vec![
Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;

Ok(Output::new_with_record_batches(records))
}

pub fn describe_table(table: TableRef) -> Result<Output> {
let table_info = table.table_info();
let columns_schemas = table_info.meta.schema.column_schemas();
Expand Down
4 changes: 4 additions & 0 deletions src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ pub enum Error {
#[snafu(display("Invalid table name: {}", name))]
InvalidTableName { name: String },

#[snafu(display("Invalid flow name: {}", name))]
InvalidFlowName { name: String },

#[snafu(display("Invalid default constraint, column: {}", column))]
InvalidDefault {
column: String,
Expand Down Expand Up @@ -274,6 +277,7 @@ impl ErrorExt for Error {
| InvalidDatabaseOption { .. }
| ColumnTypeMismatch { .. }
| InvalidTableName { .. }
| InvalidFlowName { .. }
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }
Expand Down
23 changes: 17 additions & 6 deletions src/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use snafu::ResultExt;
use sqlparser::ast::Ident;
use sqlparser::ast::{Ident, Query};
use sqlparser::dialect::Dialect;
use sqlparser::keywords::Keyword;
use sqlparser::parser::{Parser, ParserError, ParserOptions};
Expand All @@ -38,6 +38,21 @@ pub struct ParserContext<'a> {
}

impl<'a> ParserContext<'a> {
/// Construct a new ParserContext.
pub fn new(dialect: &'a dyn Dialect, sql: &'a str) -> Result<ParserContext<'a>> {
let parser = Parser::new(dialect)
.with_options(ParserOptions::new().with_trailing_commas(true))
.try_with_sql(sql)
.context(SyntaxSnafu)?;

Ok(ParserContext { parser, sql })
}

/// Parses parser context to Query.
pub fn parser_query(&mut self) -> Result<Box<Query>> {
Ok(Box::new(self.parser.parse_query().context(SyntaxSnafu)?))
}

/// Parses SQL with given dialect
pub fn create_with_dialect(
sql: &'a str,
Expand All @@ -46,11 +61,7 @@ impl<'a> ParserContext<'a> {
) -> Result<Vec<Statement>> {
let mut stmts: Vec<Statement> = Vec::new();

let parser = Parser::new(dialect)
.with_options(ParserOptions::new().with_trailing_commas(true))
.try_with_sql(sql)
.context(SyntaxSnafu)?;
let mut parser_ctx = ParserContext { sql, parser };
let mut parser_ctx = ParserContext::new(dialect, sql)?;

let mut expecting_statement_delimiter = false;
loop {
Expand Down
28 changes: 25 additions & 3 deletions src/sql/src/parsers/show_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ use snafu::{ensure, ResultExt};
use sqlparser::keywords::Keyword;
use sqlparser::tokenizer::Token;

use crate::error::{self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result};
use crate::error::{
self, InvalidDatabaseNameSnafu, InvalidFlowNameSnafu, InvalidTableNameSnafu, Result,
};
use crate::parser::ParserContext;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, ShowTables,
ShowVariables,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus,
ShowTables, ShowVariables,
};
use crate::statements::statement::Statement;

Expand Down Expand Up @@ -62,6 +64,8 @@ impl<'a> ParserContext<'a> {
} else if self.consume_token("CREATE") {
if self.consume_token("TABLE") {
self.parse_show_create_table()
} else if self.consume_token("FLOW") {
self.parse_show_create_flow()
} else {
self.unsupported(self.peek_token_as_string())
}
Expand Down Expand Up @@ -109,6 +113,24 @@ impl<'a> ParserContext<'a> {
Ok(Statement::ShowCreateTable(ShowCreateTable { table_name }))
}

fn parse_show_create_flow(&mut self) -> Result<Statement> {
let raw_flow_name = self
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "a flow name",
actual: self.peek_token_as_string(),
})?;
let flow_name = Self::canonicalize_object_name(raw_flow_name);
ensure!(
!flow_name.0.is_empty(),
InvalidFlowNameSnafu {
name: flow_name.to_string(),
}
);
Ok(Statement::ShowCreateFlow(ShowCreateFlow { flow_name }))
}

fn parse_show_table_name(&mut self) -> Result<String> {
self.parser.next_token();
let table_name = self
Expand Down
43 changes: 38 additions & 5 deletions src/sql/src/statements/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,17 @@ impl Display for CreateFlow {
if self.or_replace {
write!(f, "OR REPLACE ")?;
}
write!(f, "TASK ")?;
write!(f, "FLOW ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
write!(f, "{} ", &self.flow_name)?;
write!(f, "OUTPUT AS {} ", &self.sink_table_name)?;
writeln!(f, "{}", &self.flow_name)?;
writeln!(f, "SINK TO {}", &self.sink_table_name)?;
if let Some(expire_after) = &self.expire_after {
write!(f, "EXPIRE AFTER {} ", expire_after)?;
writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
}
if let Some(comment) = &self.comment {
write!(f, "COMMENT '{}' ", comment)?;
writeln!(f, "COMMENT '{}'", comment)?;
}
write!(f, "AS {}", &self.query)
}
Expand Down Expand Up @@ -604,4 +604,37 @@ WITH(
}
}
}

#[test]
fn test_display_create_flow() {
let sql = r"CREATE FLOW filter_numbers
SINK TO out_num_cnt
AS SELECT number FROM numbers_input where number > 10;";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());

match &result[0] {
Statement::CreateFlow(c) => {
let new_sql = format!("\n{}", c);
assert_eq!(
r#"
CREATE FLOW filter_numbers
SINK TO out_num_cnt
AS SELECT number FROM numbers_input WHERE number > 10"#,
&new_sql
);

let new_result = ParserContext::create_with_dialect(
&new_sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
assert_eq!(result, new_result);
}
_ => unreachable!(),
}
}
}
Loading

0 comments on commit 9c42825

Please sign in to comment.