Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): initially introduce table def sql purification #19949

Merged
merged 16 commits into from
Jan 6, 2025
20 changes: 20 additions & 0 deletions e2e_test/ddl/show_purify.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Test definition purification for `CREATE TABLE AS`, mainly focusing on the data types.
statement ok
create table ctas as select
0::int as v0,
1::decimal as v1,
'2022-03-13 01:00:00'::timestamp as v2,
'2022-03-13 01:00:00Z'::timestamptz as v3,
array['foo', 'bar', 'null'] as v4,
(1, (2, 3))::STRUCT<i BIGINT, j STRUCT<a BIGINT, b VARCHAR>> as v5,
hex_to_int256('0x11') as v6,
map{'key1': 1, 'key2': 2, 'key3': 3} as v7
;

query TT
show create table ctas;
----
public.ctas CREATE TABLE ctas (v0 INT, v1 NUMERIC, v2 TIMESTAMP, v3 TIMESTAMP WITH TIME ZONE, v4 CHARACTER VARYING[], v5 STRUCT<i BIGINT, j STRUCT<a BIGINT, b CHARACTER VARYING>>, v6 rw_int256, v7 MAP(CHARACTER VARYING,INT))
Comment on lines +15 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the change, how/is it possible (is it meaningful?) to get the "original create sql" for CTAS and schema registry? It seems useful for debugging/development purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be done via inspecting definition field of the metadata (like through the dashboard). However, I suppose it's indeed not that meaningful, as altering a CTAS will cause the purified definition to be persisted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as altering a CTAS will cause the purified definition to be persisted.

Note: This is not the current behavior as purified definition is only used for SHOW but not the base for schema change replanning.


statement ok
drop table ctas;
12 changes: 12 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

# Demonstrate purified definition
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name = 't';
----
CREATE TABLE t (bar INT, foo CHARACTER VARYING, gen_col INT AS bar + 1)

sleep 4s

query ?
Expand All @@ -43,6 +49,12 @@ sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root"
statement ok
ALTER TABLE t REFRESH SCHEMA;

# Demonstrate purified definition
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_tables WHERE name = 't';
----
CREATE TABLE t (bar INT, foo CHARACTER VARYING, nested STRUCT<baz INT>, gen_col INT AS bar + 1)

query ?
select * from t
----
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ impl ColumnCatalog {
!self.is_generated() && !self.is_rw_timestamp_column()
}

/// Returns whether the column is defined by user within the column definition clause
/// in the `CREATE TABLE` statement.
pub fn is_user_defined(&self) -> bool {
!self.is_hidden() && !self.is_rw_sys_column() && !self.is_connector_additional_column()
}

/// If the column is a generated column
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use thiserror::Error;

use crate::error::{ErrorCode, Result, RwError};
pub(crate) mod catalog_service;
mod purify;

pub(crate) mod connection_catalog;
pub(crate) mod database_catalog;
Expand Down
143 changes: 143 additions & 0 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_sqlparser::ast::*;

use crate::error::Result;
use crate::utils::data_type::DataTypeToAst as _;

/// Try to restore missing column definitions and constraints in the persisted table definition,
/// if the schema of the table is derived from external systems (like schema registry) or it's
/// created by `CREATE TABLE AS`.
///
/// Returns error if restoring failed, or called on non-`TableType::Table`, or the persisted
/// definition is invalid.
pub fn try_purify_table_create_sql_ast(
mut base: Statement,
columns: &[ColumnCatalog],
row_id_index: Option<usize>,
pk_column_ids: &[ColumnId],
) -> Result<Statement> {
let Statement::CreateTable {
columns: column_defs,
constraints,
wildcard_idx,
..
} = &mut base
else {
bail!("expect `CREATE TABLE` statement, found: `{:?}`", base);
};

// Filter out columns that are not defined by users in SQL.
let defined_columns = columns.iter().filter(|c| c.is_user_defined());

// If all columns are defined...
// - either the schema is fully specified by the user,
// - the persisted definition is already purified.
// No need to proceed.
if !column_defs.is_empty() && wildcard_idx.is_none() {
let defined_columns_len = defined_columns.count();
if column_defs.len() != defined_columns_len {
bail /* unlikely */ !(
"column count mismatch: defined {} columns, but {} columns in the definition",
defined_columns_len,
column_defs.len()
);
}

return Ok(base);
}

// Schema inferred. Now derive the missing columns and constraints.
// First, remove the wildcard from the definition.
*wildcard_idx = None;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need tests for wildcard, and generated column, include column ,etc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it's used solely for observability, so we might introduce comprehensive tests at a later stage.

wildcard, and generated column

Added in 265f6f7.

include column

Will be included in #19965.


// Derive `ColumnDef` from `ColumnCatalog`.
let mut purified_column_defs = Vec::new();
for column in defined_columns {
// If the column is already defined in the persisted definition, keep it.
if let Some(existing) = column_defs
.iter()
.find(|c| c.name.real_value() == column.name())
{
purified_column_defs.push(existing.clone());
continue;
}

if let Some(c) = &column.column_desc.generated_or_default_column {
match c {
GeneratedOrDefaultColumn::GeneratedColumn(_) => {
unreachable!("generated column must not be inferred");
}
GeneratedOrDefaultColumn::DefaultColumn(_) => {
// TODO: convert `ExprNode` back to ast can be a bit tricky.
// Fortunately, this case is rare as inferring default values is not
// widely supported.
bail /* unlikely */ !("purifying default value is not supported yet");
}
}
}

let column_def = ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
};
purified_column_defs.push(column_def);
}
*column_defs = purified_column_defs;

if row_id_index.is_none() {
// User-defined primary key.
let mut pk_columns = Vec::new();

for &id in pk_column_ids {
let column = columns.iter().find(|c| c.column_id() == id).unwrap();
if !column.is_user_defined() {
bail /* unlikely */ !(
"primary key column \"{}\" is not user-defined",
column.name()
);
}
pk_columns.push(column.name().into());
}

let pk_constraint = TableConstraint::Unique {
name: None,
columns: pk_columns,
is_primary: true,
};

// We don't support table constraints other than `PRIMARY KEY`, thus simply overwrite.
assert!(
constraints.len() <= 1
&& constraints.iter().all(|c| matches!(
c,
TableConstraint::Unique {
is_primary: true,
..
}
)),
"unexpected table constraints: {constraints:?}",
);

*constraints = vec![pk_constraint];
}

Ok(base)
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn read_rw_table_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwTable>> {
name: table.name().to_owned(),
schema_id: schema.id() as i32,
owner: table.owner as i32,
definition: table.create_sql(),
definition: table.create_sql_purified(),
append_only: table.append_only,
acl: get_acl_items(
&Object::TableId(table.id.table_id),
Expand Down
47 changes: 47 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;
use risingwave_sqlparser::ast;
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport as _;

use super::purify::try_purify_table_create_sql_ast;
use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::session::current::notice_to_user;
use crate::user::UserId;

/// `TableCatalog` Includes full information about a table.
Expand Down Expand Up @@ -273,6 +276,50 @@ impl TableVersion {
}
}

impl TableCatalog {
/// Returns the SQL definition when the table was created, purified with best effort
/// if it's a table.
pub fn create_sql_purified(&self) -> String {
self.create_sql_ast_purified()
.map(|stmt| stmt.to_string())
.unwrap_or_else(|_| self.create_sql())
}
Comment on lines +282 to +286
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still return the result here? Not sure if we can guarantee this for all exist tables created previously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By falling back to self.create_sql() (i.e. self.definition) we ensure that the behavior gets strictly better compared to prior to this PR. 🤔 Not sure what you mean by "guarantee this"?


/// Returns the parsed SQL definition when the table was created, purified with best effort
/// if it's a table.
///
/// Returns error if it's invalid.
pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
// Purification is only applicable to tables.
if let TableType::Table = self.table_type() {
let base = if self.definition.is_empty() {
// Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
let name = ast::ObjectName(vec![self.name.as_str().into()]);
ast::Statement::default_create_table(name)
} else {
self.create_sql_ast()?
};

match try_purify_table_create_sql_ast(
base,
self.columns(),
self.row_id_index,
&self.pk_column_ids(),
) {
Ok(stmt) => return Ok(stmt),
Err(e) => notice_to_user(format!(
"error occurred while purifying definition for table \"{}\", \
results may be inaccurate: {}",
self.name,
e.as_report()
)),
}
}

self.create_sql_ast()
}
}

impl TableCatalog {
/// Get a reference to the table catalog's table id.
pub fn id(&self) -> TableId {
Expand Down
47 changes: 4 additions & 43 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::anyhow;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnCatalog, Engine};
use risingwave_common::hash::VnodeCount;
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::sink::catalog::SinkCatalog;
Expand All @@ -29,8 +27,7 @@ use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName,
Statement, StructField, TableConstraint,
AlterTableOperation, ColumnDef, ColumnOption, Ident, ObjectName, Statement, TableConstraint,
};

use super::create_source::schema_has_schema_registry;
Expand All @@ -44,6 +41,7 @@ use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::utils::data_type::DataTypeToAst;
use crate::{Binder, TableCatalog};

/// Used in auto schema change process
Expand Down Expand Up @@ -101,10 +99,10 @@ pub async fn get_new_table_definition_for_cdc_table(
// if the column exists in the original catalog, use it to construct the column definition.
// since we don't support altering the column type right now
if let Some(original_col) = orig_column_catalog.get(new_col.name()) {
let ty = to_ast_data_type(original_col.data_type())?;
let ty = original_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![]));
} else {
let ty = to_ast_data_type(new_col.data_type())?;
let ty = new_col.data_type().to_ast();
new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![]));
}
}
Expand All @@ -113,43 +111,6 @@ pub async fn get_new_table_definition_for_cdc_table(
Ok((definition, original_catalog))
}

fn to_ast_data_type(ty: &DataType) -> Result<AstDataType> {
match ty {
DataType::Boolean => Ok(AstDataType::Boolean),
DataType::Int16 => Ok(AstDataType::SmallInt),
DataType::Int32 => Ok(AstDataType::Int),
DataType::Int64 => Ok(AstDataType::BigInt),
DataType::Float32 => Ok(AstDataType::Real),
DataType::Float64 => Ok(AstDataType::Double),
// TODO: handle precision and scale for decimal
DataType::Decimal => Ok(AstDataType::Decimal(None, None)),
DataType::Date => Ok(AstDataType::Date),
DataType::Varchar => Ok(AstDataType::Varchar),
DataType::Time => Ok(AstDataType::Time(false)),
DataType::Timestamp => Ok(AstDataType::Timestamp(false)),
DataType::Timestamptz => Ok(AstDataType::Timestamp(true)),
DataType::Interval => Ok(AstDataType::Interval),
DataType::Jsonb => Ok(AstDataType::Jsonb),
DataType::Bytea => Ok(AstDataType::Bytea),
DataType::List(item_ty) => Ok(AstDataType::Array(Box::new(to_ast_data_type(item_ty)?))),
DataType::Struct(fields) => {
let fields = fields
.iter()
.map(|(name, ty)| {
Ok::<StructField, RwError>(StructField {
name: name.into(),
data_type: to_ast_data_type(ty)?,
})
})
.try_collect()?;
Ok(AstDataType::Struct(fields))
}
DataType::Serial | DataType::Int256 | DataType::Map(_) => {
Err(anyhow!("unsupported data type: {:?}", ty).context("to_ast_data_type"))?
}
}
}

pub async fn get_replace_table_plan(
session: &Arc<SessionImpl>,
table_name: ObjectName,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ pub async fn handle_create_as(
vec![], // No watermark should be defined in for `CREATE TABLE AS`
col_id_gen.into_version(),
CreateTableProps {
definition: "".to_owned(), // TODO: empty definition means no schema change support
// Note: by providing and persisting an empty definition, querying the definition of the table
// will hit the purification logic, which will construct it based on the catalog.
definition: "".to_owned(),
append_only,
on_conflict: on_conflict.into(),
with_version_column,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ pub fn handle_show_create_object(
.get_created_table_by_name(&object_name)
.filter(|t| t.is_user_table())
.ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
table.create_sql()
table.create_sql_purified()
}
ShowCreateType::Sink => {
let sink = schema
Expand Down
Loading
Loading