diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index b2ce5d1cb24b..f66d17da33f7 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod flow_info; +pub mod flow_info; pub(crate) mod flow_name; pub(crate) mod flownode_flow; pub(crate) mod table_flow; diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index c1ce1a1c994f..86b4d2964181 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -141,6 +141,26 @@ impl FlowInfoValue { pub fn source_table_ids(&self) -> &[TableId] { &self.source_table_ids } + + pub fn flow_name(&self) -> &String { + &self.flow_name + } + + pub fn sink_table_name(&self) -> &TableName { + &self.sink_table_name + } + + pub fn raw_sql(&self) -> &String { + &self.raw_sql + } + + pub fn expire_after(&self) -> Option { + self.expire_after + } + + pub fn comment(&self) -> &String { + &self.comment + } } pub type FlowInfoManagerRef = Arc; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 4a1dcfbf295d..29c832afe595 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -514,6 +514,9 @@ pub fn check_permission( Statement::ShowCreateTable(stmt) => { validate_param(&stmt.table_name, query_ctx)?; } + Statement::ShowCreateFlow(stmt) => { + validate_param(&stmt.flow_name, query_ctx)?; + } Statement::CreateExternalTable(stmt) => { validate_param(&stmt.name, query_ctx)?; } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index bbfe34d91e00..8522b5db9bb0 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -239,6 +239,44 @@ impl StatementExecutor { self.show_create_table(table_name, table_ref, query_ctx) .await } + Statement::ShowCreateFlow(show) => { + let obj_name = &show.flow_name; + let (catalog_name, flow_name) = match &obj_name.0[..] { + [table] => (query_ctx.current_catalog().to_string(), table.value.clone()), + [catalog, table] => (catalog.value.clone(), table.value.clone()), + _ => { + return InvalidSqlSnafu { + err_msg: format!( + "expect flow name to be . or , actual: {obj_name}", + ), + } + .fail() + } + }; + + let flow_name_val = self + .flow_metadata_manager + .flow_name_manager() + .get(&catalog_name, &flow_name) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::FlowNotFoundSnafu { + flow_name: &flow_name, + })?; + + let flow_val = self + .flow_metadata_manager + .flow_info_manager() + .get(flow_name_val.flow_id()) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::FlowNotFoundSnafu { + flow_name: &flow_name, + })?; + + self.show_create_flow(obj_name.clone(), flow_val, query_ctx) + .await + } Statement::SetVariables(set_var) => { let var_name = set_var.variable.to_string().to_uppercase(); match var_name.as_str() { diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index 818734754b89..b1d0bd85a5c5 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_meta::key::flow::flow_info::FlowInfoValue; use common_query::Output; use common_telemetry::tracing; use partition::manager::PartitionInfo; @@ -23,6 +24,7 @@ use sql::statements::create::Partitions; use sql::statements::show::{ ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables, }; +use sqlparser::ast::ObjectName; use table::metadata::TableType; use table::table_name::TableName; use table::TableRef; @@ -105,6 +107,17 @@ impl StatementExecutor { .context(error::ExecuteStatementSnafu) } + #[tracing::instrument(skip_all)] + pub async fn show_create_flow( + &self, + flow_name: ObjectName, + flow_val: FlowInfoValue, + query_ctx: QueryContextRef, + ) -> Result { + query::sql::show_create_flow(flow_name, flow_val, query_ctx) + .context(error::ExecuteStatementSnafu) + } + #[tracing::instrument(skip_all)] pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result { query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu) diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index bae83acf4c66..038e26572df9 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -31,6 +31,7 @@ use common_datasource::file_format::{infer_schemas, FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; use common_datasource::util::find_dir_and_filename; +use common_meta::key::flow::flow_info::FlowInfoValue; use common_query::prelude::GREPTIME_TIMESTAMP; use common_query::Output; use common_recordbatch::adapter::RecordBatchStreamAdapter; @@ -49,10 +50,13 @@ use regex::Regex; use session::context::QueryContextRef; pub use show_create_table::create_table_stmt; use snafu::{ensure, OptionExt, ResultExt}; -use sql::statements::create::Partitions; +use sql::ast::Ident; +use sql::parser::ParserContext; +use sql::statements::create::{CreateFlow, Partitions}; use sql::statements::show::{ ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables, }; +use sqlparser::ast::ObjectName; use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY}; use table::TableRef; @@ -134,6 +138,13 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy> = Lazy::new(|| { ])) }); +static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy> = Lazy::new(|| { + Arc::new(Schema::new(vec![ + ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false), + ])) +}); + fn null() -> Expr { lit(ScalarValue::Null) } @@ -606,6 +617,46 @@ pub fn show_create_table( Ok(Output::new_with_record_batches(records)) } +pub fn show_create_flow( + flow_name: ObjectName, + flow_val: FlowInfoValue, + query_ctx: QueryContextRef, +) -> Result { + let mut parser_ctx = + ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?; + + let query = parser_ctx.parser_query().context(error::SqlSnafu)?; + + let comment = if flow_val.comment().is_empty() { + None + } else { + Some(flow_val.comment().clone()) + }; + + let stmt = CreateFlow { + flow_name, + sink_table_name: ObjectName(vec![Ident { + value: flow_val.sink_table_name().table_name.clone(), + quote_style: None, + }]), + or_replace: true, + if_not_exists: true, + expire_after: flow_val.expire_after(), + comment, + query, + }; + + let sql = format!("{}", stmt); + let columns = vec![ + Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _, + Arc::new(StringVector::from(vec![sql])) as _, + ]; + let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns) + .context(error::CreateRecordBatchSnafu)?; + + Ok(Output::new_with_record_batches(records)) +} + pub fn describe_table(table: TableRef) -> Result { let table_info = table.table_info(); let columns_schemas = table_info.meta.schema.column_schemas(); diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index c865e12a8617..ed88a8826a59 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -141,6 +141,9 @@ pub enum Error { #[snafu(display("Invalid table name: {}", name))] InvalidTableName { name: String }, + #[snafu(display("Invalid flow name: {}", name))] + InvalidFlowName { name: String }, + #[snafu(display("Invalid default constraint, column: {}", column))] InvalidDefault { column: String, @@ -274,6 +277,7 @@ impl ErrorExt for Error { | InvalidDatabaseOption { .. } | ColumnTypeMismatch { .. } | InvalidTableName { .. } + | InvalidFlowName { .. } | InvalidSqlValue { .. } | TimestampOverflow { .. } | InvalidTableOption { .. } diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 2b90c792dc11..65a12b9ea335 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -13,7 +13,7 @@ // limitations under the License. use snafu::ResultExt; -use sqlparser::ast::Ident; +use sqlparser::ast::{Ident, Query}; use sqlparser::dialect::Dialect; use sqlparser::keywords::Keyword; use sqlparser::parser::{Parser, ParserError, ParserOptions}; @@ -38,6 +38,21 @@ pub struct ParserContext<'a> { } impl<'a> ParserContext<'a> { + /// Construct a new ParserContext. + pub fn new(dialect: &'a dyn Dialect, sql: &'a str) -> Result> { + let parser = Parser::new(dialect) + .with_options(ParserOptions::new().with_trailing_commas(true)) + .try_with_sql(sql) + .context(SyntaxSnafu)?; + + Ok(ParserContext { parser, sql }) + } + + /// Parses parser context to Query. + pub fn parser_query(&mut self) -> Result> { + Ok(Box::new(self.parser.parse_query().context(SyntaxSnafu)?)) + } + /// Parses SQL with given dialect pub fn create_with_dialect( sql: &'a str, @@ -46,11 +61,7 @@ impl<'a> ParserContext<'a> { ) -> Result> { let mut stmts: Vec = Vec::new(); - let parser = Parser::new(dialect) - .with_options(ParserOptions::new().with_trailing_commas(true)) - .try_with_sql(sql) - .context(SyntaxSnafu)?; - let mut parser_ctx = ParserContext { sql, parser }; + let mut parser_ctx = ParserContext::new(dialect, sql)?; let mut expecting_statement_delimiter = false; loop { diff --git a/src/sql/src/parsers/show_parser.rs b/src/sql/src/parsers/show_parser.rs index 2ca96b9bc4d2..5cf8b5fe3c3d 100644 --- a/src/sql/src/parsers/show_parser.rs +++ b/src/sql/src/parsers/show_parser.rs @@ -16,11 +16,13 @@ use snafu::{ensure, ResultExt}; use sqlparser::keywords::Keyword; use sqlparser::tokenizer::Token; -use crate::error::{self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result}; +use crate::error::{ + self, InvalidDatabaseNameSnafu, InvalidFlowNameSnafu, InvalidTableNameSnafu, Result, +}; use crate::parser::ParserContext; use crate::statements::show::{ - ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, ShowTables, - ShowVariables, + ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, + ShowTables, ShowVariables, }; use crate::statements::statement::Statement; @@ -62,6 +64,8 @@ impl<'a> ParserContext<'a> { } else if self.consume_token("CREATE") { if self.consume_token("TABLE") { self.parse_show_create_table() + } else if self.consume_token("FLOW") { + self.parse_show_create_flow() } else { self.unsupported(self.peek_token_as_string()) } @@ -109,6 +113,24 @@ impl<'a> ParserContext<'a> { Ok(Statement::ShowCreateTable(ShowCreateTable { table_name })) } + fn parse_show_create_flow(&mut self) -> Result { + let raw_flow_name = self + .parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "a flow name", + actual: self.peek_token_as_string(), + })?; + let flow_name = Self::canonicalize_object_name(raw_flow_name); + ensure!( + !flow_name.0.is_empty(), + InvalidFlowNameSnafu { + name: flow_name.to_string(), + } + ); + Ok(Statement::ShowCreateFlow(ShowCreateFlow { flow_name })) + } + fn parse_show_table_name(&mut self) -> Result { self.parser.next_token(); let table_name = self diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 9bcf65c67578..6d43aebca713 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -269,17 +269,17 @@ impl Display for CreateFlow { if self.or_replace { write!(f, "OR REPLACE ")?; } - write!(f, "TASK ")?; + write!(f, "FLOW ")?; if self.if_not_exists { write!(f, "IF NOT EXISTS ")?; } - write!(f, "{} ", &self.flow_name)?; - write!(f, "OUTPUT AS {} ", &self.sink_table_name)?; + writeln!(f, "{}", &self.flow_name)?; + writeln!(f, "SINK TO {}", &self.sink_table_name)?; if let Some(expire_after) = &self.expire_after { - write!(f, "EXPIRE AFTER {} ", expire_after)?; + writeln!(f, "EXPIRE AFTER {} ", expire_after)?; } if let Some(comment) = &self.comment { - write!(f, "COMMENT '{}' ", comment)?; + writeln!(f, "COMMENT '{}'", comment)?; } write!(f, "AS {}", &self.query) } @@ -604,4 +604,37 @@ WITH( } } } + + #[test] + fn test_display_create_flow() { + let sql = r"CREATE FLOW filter_numbers + SINK TO out_num_cnt + AS SELECT number FROM numbers_input where number > 10;"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + + match &result[0] { + Statement::CreateFlow(c) => { + let new_sql = format!("\n{}", c); + assert_eq!( + r#" +CREATE FLOW filter_numbers +SINK TO out_num_cnt +AS SELECT number FROM numbers_input WHERE number > 10"#, + &new_sql + ); + + let new_result = ParserContext::create_with_dialect( + &new_sql, + &GreptimeDbDialect {}, + ParseOptions::default(), + ) + .unwrap(); + assert_eq!(result, new_result); + } + _ => unreachable!(), + } + } } diff --git a/src/sql/src/statements/show.rs b/src/sql/src/statements/show.rs index 2e4a76c145e0..90dad65eaded 100644 --- a/src/sql/src/statements/show.rs +++ b/src/sql/src/statements/show.rs @@ -132,6 +132,19 @@ impl Display for ShowCreateTable { } } +/// SQL structure for `SHOW CREATE FLOW`. +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] +pub struct ShowCreateFlow { + pub flow_name: ObjectName, +} + +impl Display for ShowCreateFlow { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let flow_name = &self.flow_name; + write!(f, "SHOW CREATE FLOW {flow_name}") + } +} + /// SQL structure for `SHOW VARIABLES xxx`. #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] pub struct ShowVariables { @@ -241,6 +254,35 @@ mod tests { .is_err()); } + #[test] + pub fn test_show_create_flow() { + let sql = "SHOW CREATE FLOW test"; + let stmts: Vec = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::ShowCreateFlow { .. }); + match &stmts[0] { + Statement::ShowCreateFlow(show) => { + let flow_name = show.flow_name.to_string(); + assert_eq!(flow_name, "test"); + } + _ => { + unreachable!(); + } + } + } + #[test] + pub fn test_show_create_missing_flow() { + let sql = "SHOW CREATE FLOW"; + assert!(ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default() + ) + .is_err()); + } + #[test] fn test_display_show_variables() { let sql = r"show variables v1;"; diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index a014ecb125e3..579f2a372ad2 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -31,8 +31,8 @@ use crate::statements::insert::Insert; use crate::statements::query::Query; use crate::statements::set_variables::SetVariables; use crate::statements::show::{ - ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, ShowTables, - ShowVariables, + ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, + ShowTables, ShowVariables, }; use crate::statements::tql::Tql; use crate::statements::truncate::TruncateTable; @@ -81,6 +81,8 @@ pub enum Statement { ShowIndex(ShowIndex), // SHOW CREATE TABLE ShowCreateTable(ShowCreateTable), + // SHOW CREATE FLOW + ShowCreateFlow(ShowCreateFlow), // SHOW STATUS ShowStatus(ShowStatus), // DESCRIBE TABLE @@ -118,6 +120,7 @@ impl Display for Statement { Statement::ShowColumns(s) => s.fmt(f), Statement::ShowIndex(s) => s.fmt(f), Statement::ShowCreateTable(s) => s.fmt(f), + Statement::ShowCreateFlow(s) => s.fmt(f), Statement::ShowStatus(s) => s.fmt(f), Statement::DescribeTable(s) => s.fmt(f), Statement::Explain(s) => s.fmt(f), diff --git a/tests/cases/standalone/show_create_flow.result b/tests/cases/standalone/show_create_flow.result new file mode 100644 index 000000000000..b09d026b6efe --- /dev/null +++ b/tests/cases/standalone/show_create_flow.result @@ -0,0 +1,41 @@ +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +create table out_num_cnt ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; + +Affected Rows: 0 + +SHOW CREATE FLOW filter_numbers; + ++----------------+-------------------------------------------------------+ +| Flow | Create Flow | ++----------------+-------------------------------------------------------+ +| filter_numbers | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers | +| | SINK TO out_num_cnt | +| | AS SELECT number FROM numbers_input WHERE number > 10 | ++----------------+-------------------------------------------------------+ + +drop flow filter_numbers; + +Affected Rows: 0 + +drop table out_num_cnt; + +Affected Rows: 0 + +drop table numbers_input; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/show_create_flow.sql b/tests/cases/standalone/show_create_flow.sql new file mode 100644 index 000000000000..d30557f4c404 --- /dev/null +++ b/tests/cases/standalone/show_create_flow.sql @@ -0,0 +1,19 @@ +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); +create table out_num_cnt ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + +CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; + +SHOW CREATE FLOW filter_numbers; + +drop flow filter_numbers; + +drop table out_num_cnt; + +drop table numbers_input;