From 1bf4874cda6923171237845df7c79a48f365e7a5 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Fri, 6 Sep 2024 14:07:03 -0400 Subject: [PATCH] sql: change IGNORE COLUMNS MySQL option to EXCLUDE COLUMNS We want to align this option with the forthcoming analogous option for sinks (#27758). In the context of sinks, "EXCLUDE" reads better than "IGNORE", because the columns aren't wholly ignored. Rather, they're allowed to be referenced in the PARTITION BY expression and the HEADERS option--they're just *excluded* from the final value. EXCLUDE also nicely aligns with the `SELECT ... EXCLUDE` syntax that was recently added to DuckDB and Snowflake. For backwards compatibility, the `IGNORE COLUMNS` option for the MySQL source continues to be accepted as an alias for `EXCLUDE COLUMNS`. --- doc/user/content/sql/create-source/mysql.md | 28 ++- .../sql-grammar/create-source-mysql.svg | 165 ++++++++++++------ doc/user/sql-grammar/sql-grammar.bnf | 1 + src/adapter/src/coord/sequencer/inner.rs | 28 +-- src/mysql-util/src/schemas.rs | 10 +- src/sql-lexer/src/keywords.txt | 1 + src/sql-parser/src/ast/defs/ddl.rs | 8 +- src/sql-parser/src/ast/defs/statement.rs | 22 +-- src/sql-parser/src/parser.rs | 95 +++++----- src/sql-parser/tests/testdata/ddl | 25 ++- src/sql/src/plan/statement/ddl.rs | 20 +-- src/sql/src/plan/statement/show.rs | 8 +- src/sql/src/pure.rs | 48 ++--- src/sql/src/pure/error.rs | 2 +- src/sql/src/pure/mysql.rs | 39 +++-- src/storage-types/src/sources/mysql.proto | 2 +- src/storage-types/src/sources/mysql.rs | 8 +- src/storage/src/source/mysql.rs | 4 +- src/storage/src/source/mysql/schemas.rs | 2 +- ...gnore-columns.td => 35-exclude-columns.td} | 8 +- test/mysql-cdc/alter-source.td | 32 ++-- 21 files changed, 326 insertions(+), 230 deletions(-) rename test/mysql-cdc/{35-ignore-columns.td => 35-exclude-columns.td} (87%) diff --git a/doc/user/content/sql/create-source/mysql.md b/doc/user/content/sql/create-source/mysql.md index e8056fd75042b..d3f02dd6134e8 100644 --- a/doc/user/content/sql/create-source/mysql.md +++ b/doc/user/content/sql/create-source/mysql.md @@ -50,7 +50,7 @@ _src_name_ | The name for the source. Field | Value | Description -------------------------------------|---------------------------------|------------------------------------- -`IGNORE COLUMNS` | A list of fully-qualified names | Ignore specific columns that cannot be decoded or should not be included in the subsources created in Materialize. +{{< if-unreleased "v0.117" >}}`IGNORE COLUMNS`{{< /if-unreleased >}} {{< if-released "v0.117" >}}`EXCLUDE COLUMNS`{{< /if-released >}} | A list of fully-qualified names | Exclude specific columns that cannot be decoded or should not be included in the subsources created in Materialize. `TEXT COLUMNS` | A list of fully-qualified names | Decode data as `text` for specific columns that contain MySQL types that are [unsupported in Materialize](#supported-types). ## Features @@ -256,9 +256,17 @@ following types:
  • year
  • +{{< if-unreleased "v0.117" >}} The specified columns will be treated as `text`, and will thus not offer the expected MySQL type features. For any unsupported data types not listed above, use the [`IGNORE COLUMNS`](#ignoring-columns) option. +{{< /if-unreleased >}} + +{{< if-released "v0.117" >}} +The specified columns will be treated as `text`, and will thus not offer the +expected MySQL type features. For any unsupported data types not listed above, +use the [`EXCLUDE COLUMNS`](#excluding-columns) option. +{{< /if-released >}} ##### Truncation @@ -372,6 +380,7 @@ CREATE SOURCE mz_source FOR ALL TABLES; ``` +{{< if-unreleased "v0.117" >}} #### Ignoring columns MySQL doesn't provide a way to filter out columns from the replication stream. @@ -385,6 +394,23 @@ CREATE SOURCE mz_source ) FOR ALL TABLES; ``` +{{< /if-unreleased >}} + +{{< if-released "v0.117" >}} +#### Excluding columns + +MySQL doesn't provide a way to filter out columns from the replication stream. +To exclude specific upstream columns from being ingested, use the `EXCLUDE +COLUMNS` option. + +```mzsql +CREATE SOURCE mz_source + FROM MYSQL CONNECTION mysql_connection ( + EXCLUDE COLUMNS (mydb.table_1.column_to_ignore) + ) + FOR ALL TABLES; +``` +{{< /if-released >}} ### Handling errors and schema changes diff --git a/doc/user/layouts/partials/sql-grammar/create-source-mysql.svg b/doc/user/layouts/partials/sql-grammar/create-source-mysql.svg index 6057339706934..7fae32b922874 100644 --- a/doc/user/layouts/partials/sql-grammar/create-source-mysql.svg +++ b/doc/user/layouts/partials/sql-grammar/create-source-mysql.svg @@ -1,4 +1,4 @@ - + @@ -152,119 +152,174 @@ class="terminal" rx="10"/> ) - - + + , + + + EXCLUDE COLUMNS + + + ( + + + column_name + + + , + + + ) + + + ) + + - FOR ALL TABLES - + FOR ALL TABLES + - FOR TABLES - + FOR TABLES + - ( - - - table_name - + ( + + + table_name + - AS - - - subsrc_name - + AS + + + subsrc_name + - , - + , + - FOR SCHEMAS - + FOR SCHEMAS + - ( - - - schema_name - + ( + + + schema_name + - , - + , + - ) - + ) + - EXPOSE - + EXPOSE + - PROGRESS - + PROGRESS + - AS - - - progress_subsource_name - - - with_options + AS + + + progress_subsource_name + + + with_options - - + d="m17 17 h2 m0 0 h10 m140 0 h10 m20 0 h10 m0 0 h130 m-160 0 h20 m140 0 h20 m-180 0 q10 0 10 10 m160 0 q0 -10 10 -10 m-170 10 v12 m160 0 v-12 m-160 12 q0 10 10 10 m140 0 q10 0 10 -10 m-150 10 h10 m120 0 h10 m20 -32 h10 m82 0 h10 m2 0 l2 0 m2 0 l2 0 m2 0 l2 0 m-363 98 l2 0 m2 0 l2 0 m2 0 l2 0 m22 0 h10 m0 0 h242 m-272 0 h20 m252 0 h20 m-292 0 q10 0 10 10 m272 0 q0 -10 10 -10 m-282 10 v12 m272 0 v-12 m-272 12 q0 10 10 10 m252 0 q10 0 10 -10 m-262 10 h10 m104 0 h10 m0 0 h10 m108 0 h10 m20 -32 h10 m60 0 h10 m0 0 h10 m70 0 h10 m2 0 l2 0 m2 0 l2 0 m2 0 l2 0 m-401 98 l2 0 m2 0 l2 0 m2 0 l2 0 m2 0 h10 m116 0 h10 m0 0 h10 m136 0 h10 m2 0 l2 0 m2 0 l2 0 m2 0 l2 0 m-441 110 l2 0 m2 0 l2 0 m2 0 l2 0 m22 0 h10 m26 0 h10 m0 0 h10 m134 0 h10 m20 0 h10 m26 0 h10 m20 0 h10 m110 0 h10 m-150 0 l20 0 m-1 0 q-9 0 -9 -10 l0 -24 q0 -10 10 -10 m130 44 l20 0 m-20 0 q10 0 10 -10 l0 -24 q0 -10 -10 -10 m-130 0 h10 m24 0 h10 m0 0 h86 m20 44 h10 m26 0 h10 m-282 0 h20 m262 0 h20 m-302 0 q10 0 10 10 m282 0 q0 -10 10 -10 m-292 10 v14 m282 0 v-14 m-282 14 q0 10 10 10 m262 0 q10 0 10 -10 m-272 10 h10 m0 0 h252 m-502 -34 h20 m502 0 h20 m-542 0 q10 0 10 10 m522 0 q0 -10 10 -10 m-532 10 v30 m522 0 v-30 m-522 30 q0 10 10 10 m502 0 q10 0 10 -10 m-512 10 h10 m0 0 h492 m22 -50 l2 0 m2 0 l2 0 m2 0 l2 0 m-596 142 l2 0 m2 0 l2 0 m2 0 l2 0 m22 0 h10 m24 0 h10 m0 0 h10 m150 0 h10 m20 0 h10 m26 0 h10 m20 0 h10 m110 0 h10 m-150 0 l20 0 m-1 0 q-9 0 -9 -10 l0 -24 q0 -10 10 -10 m130 44 l20 0 m-20 0 q10 0 10 -10 l0 -24 q0 -10 -10 -10 m-130 0 h10 m24 0 h10 m0 0 h86 m20 44 h10 m26 0 h10 m-282 0 h20 m262 0 h20 m-302 0 q10 0 10 10 m282 0 q0 -10 10 -10 m-292 10 v14 m282 0 v-14 m-282 14 q0 10 10 10 m262 0 q10 0 10 -10 m-272 10 h10 m0 0 h252 m20 -34 h10 m26 0 h10 m-582 0 h20 m562 0 h20 m-602 0 q10 0 10 10 m582 0 q0 -10 10 -10 m-592 10 v30 m582 0 v-30 m-582 30 q0 10 10 10 m562 0 q10 0 10 -10 m-572 10 h10 m0 0 h552 m22 -50 l2 0 m2 0 l2 0 m2 0 l2 0 m-632 142 l2 0 m2 0 l2 0 m2 0 l2 0 m22 0 h10 m24 0 h10 m0 0 h10 m162 0 h10 m20 0 h10 m26 0 h10 m20 0 h10 m110 0 h10 m-150 0 l20 0 m-1 0 q-9 0 -9 -10 l0 -24 q0 -10 10 -10 m130 44 l20 0 m-20 0 q10 0 10 -10 l0 -24 q0 -10 -10 -10 m-130 0 h10 m24 0 h10 m0 0 h86 m20 44 h10 m26 0 h10 m-282 0 h20 m262 0 h20 m-302 0 q10 0 10 10 m282 0 q0 -10 10 -10 m-292 10 v14 m282 0 v-14 m-282 14 q0 10 10 10 m262 0 q10 0 10 -10 m-272 10 h10 m0 0 h252 m20 -34 h10 m26 0 h10 m-594 0 h20 m574 0 h20 m-614 0 q10 0 10 10 m594 0 q0 -10 10 -10 m-604 10 v30 m594 0 v-30 m-594 30 q0 10 10 10 m574 0 q10 0 10 -10 m-584 10 h10 m0 0 h564 m22 -50 l2 0 m2 0 l2 0 m2 0 l2 0 m-671 98 l2 0 m2 0 l2 0 m2 0 l2 0 m22 0 h10 m138 0 h10 m0 0 h482 m-660 0 h20 m640 0 h20 m-680 0 q10 0 10 10 m660 0 q0 -10 10 -10 m-670 10 v68 m660 0 v-68 m-660 68 q0 10 10 10 m640 0 q10 0 10 -10 m-630 10 h10 m106 0 h10 m0 0 h10 m26 0 h10 m20 0 h10 m96 0 h10 m20 0 h10 m0 0 h176 m-206 0 h20 m186 0 h20 m-226 0 q10 0 10 10 m206 0 q0 -10 10 -10 m-216 10 v12 m206 0 v-12 m-206 12 q0 10 10 10 m186 0 q10 0 10 -10 m-196 10 h10 m40 0 h10 m0 0 h10 m106 0 h10 m-342 -32 l20 0 m-1 0 q-9 0 -9 -10 l0 -24 q0 -10 10 -10 m342 44 l20 0 m-20 0 q10 0 10 -10 l0 -24 q0 -10 -10 -10 m-342 0 h10 m24 0 h10 m0 0 h298 m-554 44 h20 m554 0 h20 m-594 0 q10 0 10 10 m574 0 q0 -10 10 -10 m-584 10 v100 m574 0 v-100 m-574 100 q0 10 10 10 m554 0 q10 0 10 -10 m-564 10 h10 m124 0 h10 m0 0 h10 m26 0 h10 m20 0 h10 m114 0 h10 m-154 0 l20 0 m-1 0 q-9 0 -9 -10 l0 -24 q0 -10 10 -10 m134 44 l20 0 m-20 0 q10 0 10 -10 l0 -24 q0 -10 -10 -10 m-134 0 h10 m24 0 h10 m0 0 h90 m20 44 h190 m20 -120 h10 m26 0 h10 m22 -88 l2 0 m2 0 l2 0 m2 0 l2 0 m-628 258 l2 0 m2 0 l2 0 m2 0 l2 0 m22 0 h10 m0 0 h478 m-508 0 h20 m488 0 h20 m-528 0 q10 0 10 10 m508 0 q0 -10 10 -10 m-518 10 v12 m508 0 v-12 m-508 12 q0 10 10 10 m488 0 q10 0 10 -10 m-498 10 h10 m76 0 h10 m0 0 h10 m96 0 h10 m0 0 h10 m40 0 h10 m0 0 h10 m196 0 h10 m22 -32 l2 0 m2 0 l2 0 m2 0 l2 0 m-116 86 l2 0 m2 0 l2 0 m2 0 l2 0 m22 0 h10 m0 0 h112 m-142 0 h20 m122 0 h20 m-162 0 q10 0 10 10 m142 0 q0 -10 10 -10 m-152 10 v12 m142 0 v-12 m-142 12 q0 10 10 10 m122 0 q10 0 10 -10 m-132 10 h10 m102 0 h10 m23 -32 h-3"/> + + diff --git a/doc/user/sql-grammar/sql-grammar.bnf b/doc/user/sql-grammar/sql-grammar.bnf index c462b86bf1817..be6626973e60b 100644 --- a/doc/user/sql-grammar/sql-grammar.bnf +++ b/doc/user/sql-grammar/sql-grammar.bnf @@ -232,6 +232,7 @@ create_source_mysql ::= 'FROM' 'MYSQL' 'CONNECTION' connection_name ( ( '(' 'TEXT COLUMNS' ('(' (column_name) ( ( ',' column_name ) )* ')')? )? ) ( ( ',' 'IGNORE COLUMNS' ('(' (column_name) ( ( ',' column_name ) )* ')')? ')' )? ) + ( ( ',' 'EXCLUDE COLUMNS' ('(' (column_name) ( ( ',' column_name ) )* ')')? ')' )? ) ('FOR ALL TABLES' | 'FOR TABLES' '(' table_name ('AS' subsrc_name)? (',' table_name ('AS' subsrc_name)? )* ')' | 'FOR SCHEMAS' '(' schema_name (',' schema_name )* ')' diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index abc6ffe452071..cc7ee131dae5c 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -3631,7 +3631,7 @@ impl Coordinator { let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted { text_columns: mut new_text_columns, - ignore_columns: mut new_ignore_columns, + exclude_columns: mut new_exclude_columns, .. } = options.try_into()?; @@ -3709,7 +3709,7 @@ impl Coordinator { } => { let mz_sql::plan::MySqlConfigOptionExtracted { mut text_columns, - mut ignore_columns, + mut exclude_columns, .. } = curr_options.clone().try_into()?; @@ -3719,28 +3719,28 @@ impl Coordinator { !matches!( o.name, MySqlConfigOptionName::TextColumns - | MySqlConfigOptionName::IgnoreColumns + | MySqlConfigOptionName::ExcludeColumns ) }); - // Drop all text / ignore columns that are not currently referred to. + // Drop all text / exclude columns that are not currently referred to. let column_referenced = |column_qualified_reference: &UnresolvedItemName| { mz_ore::soft_assert_eq_or_log!( column_qualified_reference.0.len(), 3, - "all TEXT COLUMNS & IGNORE COLUMNS values must be column-qualified references" + "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references" ); let mut table = column_qualified_reference.clone(); table.0.truncate(2); curr_references.contains(&table) }; text_columns.retain(column_referenced); - ignore_columns.retain(column_referenced); + exclude_columns.retain(column_referenced); - // Merge the current text / ignore columns into the new text / ignore columns. + // Merge the current text / exclude columns into the new text / exclude columns. new_text_columns.extend(text_columns); - new_ignore_columns.extend(ignore_columns); + new_exclude_columns.extend(exclude_columns); // If we have text columns, add them to the options. if !new_text_columns.is_empty() { @@ -3755,17 +3755,17 @@ impl Coordinator { value: Some(WithOptionValue::Sequence(new_text_columns)), }); } - // If we have ignore columns, add them to the options. - if !new_ignore_columns.is_empty() { - new_ignore_columns.sort(); - let new_ignore_columns = new_ignore_columns + // If we have exclude columns, add them to the options. + if !new_exclude_columns.is_empty() { + new_exclude_columns.sort(); + let new_exclude_columns = new_exclude_columns .into_iter() .map(WithOptionValue::UnresolvedItemName) .collect(); curr_options.push(MySqlConfigOption { - name: MySqlConfigOptionName::IgnoreColumns, - value: Some(WithOptionValue::Sequence(new_ignore_columns)), + name: MySqlConfigOptionName::ExcludeColumns, + value: Some(WithOptionValue::Sequence(new_exclude_columns)), }); } } diff --git a/src/mysql-util/src/schemas.rs b/src/mysql-util/src/schemas.rs index 05551e49bf2ae..f93b688bcaf6a 100644 --- a/src/mysql-util/src/schemas.rs +++ b/src/mysql-util/src/schemas.rs @@ -90,14 +90,14 @@ impl MySqlTableSchema { } /// Convert the raw table schema to our MySqlTableDesc representation - /// using any provided text_columns and ignore_columns + /// using any provided text_columns and exclude_columns pub fn to_desc( self, text_columns: Option<&BTreeSet<&str>>, - ignore_columns: Option<&BTreeSet<&str>>, + exclude_columns: Option<&BTreeSet<&str>>, ) -> Result { - // Verify there are no duplicates in text_columns and ignore_columns - match (&text_columns, &ignore_columns) { + // Verify there are no duplicates in text_columns and exclude_columns + match (&text_columns, &exclude_columns) { (Some(text_cols), Some(ignore_cols)) => { let intersection: Vec<_> = text_cols.intersection(ignore_cols).collect(); if !intersection.is_empty() { @@ -133,7 +133,7 @@ impl MySqlTableSchema { } // If this column is ignored, use None for the column type to signal that it should be. - if let Some(ignore_cols) = &ignore_columns { + if let Some(ignore_cols) = &exclude_columns { if ignore_cols.contains(&info.column_name.as_str()) { columns.push(MySqlColumnDesc { name: info.column_name, diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 9d8fa8a22b91e..365a4e89d425c 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -150,6 +150,7 @@ Escape Estimate Every Except +Exclude Execute Exists Expected diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index e52ed9fcf168b..00bda4bcc01e3 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -1092,13 +1092,13 @@ pub enum MySqlConfigOptionName { /// fully deprecating that feature and forcing users to use explicit /// `CREATE TABLE .. FROM SOURCE` statements TextColumns, - /// Columns you want to ignore + /// Columns you want to exclude /// NOTE(roshan): This value is kept around to allow round-tripping a /// `CREATE SOURCE` statement while we still allow creating implicit /// subsources from `CREATE SOURCE`, but will be removed once /// fully deprecating that feature and forcing users to use explicit /// `CREATE TABLE .. FROM SOURCE` statements - IgnoreColumns, + ExcludeColumns, } impl AstDisplay for MySqlConfigOptionName { @@ -1106,7 +1106,7 @@ impl AstDisplay for MySqlConfigOptionName { f.write_str(match self { MySqlConfigOptionName::Details => "DETAILS", MySqlConfigOptionName::TextColumns => "TEXT COLUMNS", - MySqlConfigOptionName::IgnoreColumns => "IGNORE COLUMNS", + MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS", }) } } @@ -1122,7 +1122,7 @@ impl WithOptionName for MySqlConfigOptionName { match self { MySqlConfigOptionName::Details | MySqlConfigOptionName::TextColumns - | MySqlConfigOptionName::IgnoreColumns => false, + | MySqlConfigOptionName::ExcludeColumns => false, } } } diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 104b2ebf56b5f..a41aef884ef85 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -1098,8 +1098,8 @@ pub enum CreateSubsourceOptionName { ExternalReference, /// Columns whose types you want to unconditionally format as text TextColumns, - /// Columns you want to ignore when ingesting data - IgnoreColumns, + /// Columns you want to exclude when ingesting data + ExcludeColumns, /// `DETAILS` for this subsource, hex-encoded protobuf type /// `mz_storage_types::sources::SourceExportStatementDetails` Details, @@ -1111,7 +1111,7 @@ impl AstDisplay for CreateSubsourceOptionName { CreateSubsourceOptionName::Progress => "PROGRESS", CreateSubsourceOptionName::ExternalReference => "EXTERNAL REFERENCE", CreateSubsourceOptionName::TextColumns => "TEXT COLUMNS", - CreateSubsourceOptionName::IgnoreColumns => "IGNORE COLUMNS", + CreateSubsourceOptionName::ExcludeColumns => "EXCLUDE COLUMNS", CreateSubsourceOptionName::Details => "DETAILS", }) } @@ -1129,7 +1129,7 @@ impl WithOptionName for CreateSubsourceOptionName { | CreateSubsourceOptionName::ExternalReference | CreateSubsourceOptionName::Details | CreateSubsourceOptionName::TextColumns - | CreateSubsourceOptionName::IgnoreColumns => false, + | CreateSubsourceOptionName::ExcludeColumns => false, } } } @@ -1510,8 +1510,8 @@ impl_display_for_with_option!(TableOption); pub enum TableFromSourceOptionName { /// Columns whose types you want to unconditionally format as text TextColumns, - /// Columns you want to ignore when ingesting data - IgnoreColumns, + /// Columns you want to exclude when ingesting data + ExcludeColumns, /// Hex-encoded protobuf of a `ProtoSourceExportStatementDetails` /// message, which includes details necessary for planning this /// table as a Source Export @@ -1522,7 +1522,7 @@ impl AstDisplay for TableFromSourceOptionName { fn fmt(&self, f: &mut AstFormatter) { f.write_str(match self { TableFromSourceOptionName::TextColumns => "TEXT COLUMNS", - TableFromSourceOptionName::IgnoreColumns => "IGNORE COLUMNS", + TableFromSourceOptionName::ExcludeColumns => "EXCLUDE COLUMNS", TableFromSourceOptionName::Details => "DETAILS", }) } @@ -1539,7 +1539,7 @@ impl WithOptionName for TableFromSourceOptionName { match self { TableFromSourceOptionName::Details | TableFromSourceOptionName::TextColumns - | TableFromSourceOptionName::IgnoreColumns => false, + | TableFromSourceOptionName::ExcludeColumns => false, } } } @@ -2534,7 +2534,7 @@ pub enum AlterSourceAddSubsourceOptionName { /// Columns whose types you want to unconditionally format as text TextColumns, /// Columns you want to ignore when ingesting data - IgnoreColumns, + ExcludeColumns, /// Updated `DETAILS` for an ingestion, e.g. /// [`crate::ast::PgConfigOptionName::Details`] /// or @@ -2546,7 +2546,7 @@ impl AstDisplay for AlterSourceAddSubsourceOptionName { fn fmt(&self, f: &mut AstFormatter) { f.write_str(match self { AlterSourceAddSubsourceOptionName::TextColumns => "TEXT COLUMNS", - AlterSourceAddSubsourceOptionName::IgnoreColumns => "IGNORE COLUMNS", + AlterSourceAddSubsourceOptionName::ExcludeColumns => "EXCLUDE COLUMNS", AlterSourceAddSubsourceOptionName::Details => "DETAILS", }) } @@ -2563,7 +2563,7 @@ impl WithOptionName for AlterSourceAddSubsourceOptionName { match self { AlterSourceAddSubsourceOptionName::Details | AlterSourceAddSubsourceOptionName::TextColumns - | AlterSourceAddSubsourceOptionName::IgnoreColumns => false, + | AlterSourceAddSubsourceOptionName::ExcludeColumns => false, } } } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index d69663f765334..c85e20b09ec05 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2731,47 +2731,49 @@ impl<'a> Parser<'a> { } fn parse_create_subsource_option(&mut self) -> Result, ParserError> { - let option = - match self.expect_one_of_keywords(&[EXTERNAL, PROGRESS, TEXT, IGNORE, DETAILS])? { - EXTERNAL => { - self.expect_keyword(REFERENCE)?; - CreateSubsourceOption { - name: CreateSubsourceOptionName::ExternalReference, - value: self.parse_optional_option_value()?, - } - } - PROGRESS => CreateSubsourceOption { - name: CreateSubsourceOptionName::Progress, + let option = match self + .expect_one_of_keywords(&[EXTERNAL, PROGRESS, TEXT, EXCLUDE, IGNORE, DETAILS])? + { + EXTERNAL => { + self.expect_keyword(REFERENCE)?; + CreateSubsourceOption { + name: CreateSubsourceOptionName::ExternalReference, value: self.parse_optional_option_value()?, - }, - ref keyword @ (TEXT | IGNORE) => { - self.expect_keyword(COLUMNS)?; + } + } + PROGRESS => CreateSubsourceOption { + name: CreateSubsourceOptionName::Progress, + value: self.parse_optional_option_value()?, + }, + ref keyword @ (TEXT | EXCLUDE | IGNORE) => { + self.expect_keyword(COLUMNS)?; - let _ = self.consume_token(&Token::Eq); + let _ = self.consume_token(&Token::Eq); - let value = - self.parse_option_sequence(Parser::parse_identifier)? - .map(|inner| { - WithOptionValue::Sequence( - inner.into_iter().map(WithOptionValue::Ident).collect_vec(), - ) - }); + let value = self + .parse_option_sequence(Parser::parse_identifier)? + .map(|inner| { + WithOptionValue::Sequence( + inner.into_iter().map(WithOptionValue::Ident).collect_vec(), + ) + }); - CreateSubsourceOption { - name: match *keyword { - TEXT => CreateSubsourceOptionName::TextColumns, - IGNORE => CreateSubsourceOptionName::IgnoreColumns, - _ => unreachable!(), - }, - value, - } + CreateSubsourceOption { + name: match *keyword { + TEXT => CreateSubsourceOptionName::TextColumns, + // IGNORE is historical syntax for this option. + EXCLUDE | IGNORE => CreateSubsourceOptionName::ExcludeColumns, + _ => unreachable!(), + }, + value, } - DETAILS => CreateSubsourceOption { - name: CreateSubsourceOptionName::Details, - value: self.parse_optional_option_value()?, - }, - _ => unreachable!(), - }; + } + DETAILS => CreateSubsourceOption { + name: CreateSubsourceOptionName::Details, + value: self.parse_optional_option_value()?, + }, + _ => unreachable!(), + }; Ok(option) } @@ -3321,7 +3323,7 @@ impl<'a> Parser<'a> { } fn parse_mysql_connection_option(&mut self) -> Result, ParserError> { - match self.expect_one_of_keywords(&[DETAILS, TEXT, IGNORE])? { + match self.expect_one_of_keywords(&[DETAILS, TEXT, EXCLUDE, IGNORE])? { DETAILS => Ok(MySqlConfigOption { name: MySqlConfigOptionName::Details, value: self.parse_optional_option_value()?, @@ -3347,7 +3349,8 @@ impl<'a> Parser<'a> { value, }) } - IGNORE => { + // IGNORE is historical syntax for the option. + EXCLUDE | IGNORE => { self.expect_keyword(COLUMNS)?; let _ = self.consume_token(&Token::Eq); @@ -3364,7 +3367,7 @@ impl<'a> Parser<'a> { }); Ok(MySqlConfigOption { - name: MySqlConfigOptionName::IgnoreColumns, + name: MySqlConfigOptionName::ExcludeColumns, value, }) } @@ -4429,8 +4432,8 @@ impl<'a> Parser<'a> { fn parse_table_from_source_option( &mut self, ) -> Result, ParserError> { - let option = match self.expect_one_of_keywords(&[TEXT, IGNORE, DETAILS])? { - ref keyword @ (TEXT | IGNORE) => { + let option = match self.expect_one_of_keywords(&[TEXT, EXCLUDE, IGNORE, DETAILS])? { + ref keyword @ (TEXT | IGNORE | EXCLUDE) => { self.expect_keyword(COLUMNS)?; let _ = self.consume_token(&Token::Eq); @@ -4446,7 +4449,8 @@ impl<'a> Parser<'a> { TableFromSourceOption { name: match *keyword { TEXT => TableFromSourceOptionName::TextColumns, - IGNORE => TableFromSourceOptionName::IgnoreColumns, + // IGNORE is historical syntax for this option. + EXCLUDE | IGNORE => TableFromSourceOptionName::ExcludeColumns, _ => unreachable!(), }, value, @@ -5091,8 +5095,8 @@ impl<'a> Parser<'a> { fn parse_alter_source_add_subsource_option( &mut self, ) -> Result, ParserError> { - match self.expect_one_of_keywords(&[TEXT, IGNORE])? { - ref keyword @ (TEXT | IGNORE) => { + match self.expect_one_of_keywords(&[TEXT, EXCLUDE, IGNORE])? { + ref keyword @ (TEXT | EXCLUDE | IGNORE) => { self.expect_keyword(COLUMNS)?; let _ = self.consume_token(&Token::Eq); @@ -5111,7 +5115,8 @@ impl<'a> Parser<'a> { Ok(AlterSourceAddSubsourceOption { name: match *keyword { TEXT => AlterSourceAddSubsourceOptionName::TextColumns, - IGNORE => AlterSourceAddSubsourceOptionName::IgnoreColumns, + // IGNORE is historical syntax for this option. + EXCLUDE | IGNORE => AlterSourceAddSubsourceOptionName::ExcludeColumns, _ => unreachable!(), }, value, diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index cd1d70f9e5dfe..c07e35e6c7d15 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -550,16 +550,23 @@ CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("mz_source" parse-statement CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (IGNORE COLUMNS (public.foo.bar)) FOR ALL TABLES; ---- -CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (IGNORE COLUMNS = (public.foo.bar)) FOR ALL TABLES +CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (EXCLUDE COLUMNS = (public.foo.bar)) FOR ALL TABLES => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("mz_source")]), in_cluster: None, col_names: [], connection: MySql { connection: Name(UnresolvedItemName([Ident("mysqlconn")])), options: [MySqlConfigOption { name: IgnoreColumns, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("bar")]))])) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], external_references: Some(All), progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("mz_source")]), in_cluster: None, col_names: [], connection: MySql { connection: Name(UnresolvedItemName([Ident("mysqlconn")])), options: [MySqlConfigOption { name: ExcludeColumns, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("bar")]))])) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], external_references: Some(All), progress_subsource: None }) parse-statement -CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (IGNORE COLUMNS (public.foo.bar), TEXT COLUMNS (public.foo.baz)) FOR ALL TABLES; +CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (EXCLUDE COLUMNS (public.foo.bar)) FOR ALL TABLES; ---- -CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (IGNORE COLUMNS = (public.foo.bar), TEXT COLUMNS = (public.foo.baz)) FOR ALL TABLES +CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (EXCLUDE COLUMNS = (public.foo.bar)) FOR ALL TABLES => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("mz_source")]), in_cluster: None, col_names: [], connection: MySql { connection: Name(UnresolvedItemName([Ident("mysqlconn")])), options: [MySqlConfigOption { name: IgnoreColumns, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("bar")]))])) }, MySqlConfigOption { name: TextColumns, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("baz")]))])) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], external_references: Some(All), progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("mz_source")]), in_cluster: None, col_names: [], connection: MySql { connection: Name(UnresolvedItemName([Ident("mysqlconn")])), options: [MySqlConfigOption { name: ExcludeColumns, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("bar")]))])) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], external_references: Some(All), progress_subsource: None }) + +parse-statement +CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (EXCLUDE COLUMNS (public.foo.bar), TEXT COLUMNS (public.foo.baz)) FOR ALL TABLES; +---- +CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn (EXCLUDE COLUMNS = (public.foo.bar), TEXT COLUMNS = (public.foo.baz)) FOR ALL TABLES +=> +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("mz_source")]), in_cluster: None, col_names: [], connection: MySql { connection: Name(UnresolvedItemName([Ident("mysqlconn")])), options: [MySqlConfigOption { name: ExcludeColumns, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("bar")]))])) }, MySqlConfigOption { name: TextColumns, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("baz")]))])) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], external_references: Some(All), progress_subsource: None }) parse-statement CREATE SOURCE psychic FROM POSTGRES CONNECTION pgconn (PUBLICATION 'red'); @@ -576,11 +583,11 @@ CREATE SOURCE psychic FROM YUGABYTE CONNECTION pgconn (PUBLICATION = 'red') CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("psychic")]), in_cluster: None, col_names: [], connection: Yugabyte { connection: Name(UnresolvedItemName([Ident("pgconn")])), options: [PgConfigOption { name: Publication, value: Some(Value(String("red"))) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement -CREATE SUBSOURCE mz_subsource (c int) OF SOURCE public.foo.mz_source WITH (EXTERNAL REFERENCE = public.foo.bar, IGNORE COLUMNS (bar), TEXT COLUMNS (baz, foobar), DETAILS = 'details'); +CREATE SUBSOURCE mz_subsource (c int) OF SOURCE public.foo.mz_source WITH (EXTERNAL REFERENCE = public.foo.bar, EXCLUDE COLUMNS (bar), TEXT COLUMNS (baz, foobar), DETAILS = 'details'); ---- -CREATE SUBSOURCE mz_subsource (c int4) OF SOURCE public.foo.mz_source WITH (EXTERNAL REFERENCE = public.foo.bar, IGNORE COLUMNS = (bar), TEXT COLUMNS = (baz, foobar), DETAILS = 'details') +CREATE SUBSOURCE mz_subsource (c int4) OF SOURCE public.foo.mz_source WITH (EXTERNAL REFERENCE = public.foo.bar, EXCLUDE COLUMNS = (bar), TEXT COLUMNS = (baz, foobar), DETAILS = 'details') => -CreateSubsource(CreateSubsourceStatement { name: UnresolvedItemName([Ident("mz_subsource")]), columns: [ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], of_source: Some(Name(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("mz_source")]))), constraints: [], if_not_exists: false, with_options: [CreateSubsourceOption { name: ExternalReference, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("bar")]))) }, CreateSubsourceOption { name: IgnoreColumns, value: Some(Sequence([Ident(Ident("bar"))])) }, CreateSubsourceOption { name: TextColumns, value: Some(Sequence([Ident(Ident("baz")), Ident(Ident("foobar"))])) }, CreateSubsourceOption { name: Details, value: Some(Value(String("details"))) }] }) +CreateSubsource(CreateSubsourceStatement { name: UnresolvedItemName([Ident("mz_subsource")]), columns: [ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], of_source: Some(Name(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("mz_source")]))), constraints: [], if_not_exists: false, with_options: [CreateSubsourceOption { name: ExternalReference, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("public"), Ident("foo"), Ident("bar")]))) }, CreateSubsourceOption { name: ExcludeColumns, value: Some(Sequence([Ident(Ident("bar"))])) }, CreateSubsourceOption { name: TextColumns, value: Some(Sequence([Ident(Ident("baz")), Ident(Ident("foobar"))])) }, CreateSubsourceOption { name: Details, value: Some(Value(String("details"))) }] }) parse-statement CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic', PROGRESS GROUP ID PREFIX 'prefix', COMPRESSION TYPE = gzip, TOPIC METADATA REFRESH INTERVAL = '1s', PARTITION BY = 1 + 2) FORMAT BYTES @@ -1570,7 +1577,7 @@ AlterSource(AlterSourceStatement { source_name: UnresolvedItemName([Ident("n")]) parse-statement ALTER SOURCE IF EXISTS n ADD SUBSOURCE a, b.c, c, d.e WITH () ---- -error: Expected one of TEXT or IGNORE, found right parenthesis +error: Expected one of TEXT or EXCLUDE or IGNORE, found right parenthesis ALTER SOURCE IF EXISTS n ADD SUBSOURCE a, b.c, c, d.e WITH () ^ diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index a527e0ef429c6..a9a47516d32cb 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -454,7 +454,7 @@ generate_extracted_config!( MySqlConfigOption, (Details, String), (TextColumns, Vec::, Default(vec![])), - (IgnoreColumns, Vec::, Default(vec![])) + (ExcludeColumns, Vec::, Default(vec![])) ); pub fn plan_create_webhook_source( @@ -845,11 +845,11 @@ pub fn plan_create_source( }; let MySqlConfigOptionExtracted { details, - // text/ignore columns are already part of the source-exports and are only included + // text/exclude columns are already part of the source-exports and are only included // in these options for round-tripping of a `CREATE SOURCE` statement. This should // be removed once we drop support for implicitly created subsources. text_columns: _, - ignore_columns: _, + exclude_columns: _, seen: _, } = options.clone().try_into()?; @@ -1333,7 +1333,7 @@ generate_extracted_config!( (Progress, bool, Default(false)), (ExternalReference, UnresolvedItemName), (TextColumns, Vec::, Default(vec![])), - (IgnoreColumns, Vec::, Default(vec![])), + (ExcludeColumns, Vec::, Default(vec![])), (Details, String) ); @@ -1354,7 +1354,7 @@ pub fn plan_create_subsource( progress, external_reference, text_columns, - ignore_columns, + exclude_columns, details, seen: _, } = with_options.clone().try_into()?; @@ -1404,7 +1404,7 @@ pub fn plan_create_subsource( table, initial_gtid_set, text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(), - ignore_columns: ignore_columns + exclude_columns: exclude_columns .into_iter() .map(|c| c.into_string()) .collect(), @@ -1448,7 +1448,7 @@ pub fn plan_create_subsource( generate_extracted_config!( TableFromSourceOption, (TextColumns, Vec::, Default(vec![])), - (IgnoreColumns, Vec::, Default(vec![])), + (ExcludeColumns, Vec::, Default(vec![])), (Details, String) ); @@ -1470,7 +1470,7 @@ pub fn plan_create_table_from_source( let TableFromSourceOptionExtracted { text_columns, - ignore_columns, + exclude_columns, details, seen: _, } = with_options.clone().try_into()?; @@ -1508,7 +1508,7 @@ pub fn plan_create_table_from_source( table, initial_gtid_set, text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(), - ignore_columns: ignore_columns + exclude_columns: exclude_columns .into_iter() .map(|c| c.into_string()) .collect(), @@ -6053,7 +6053,7 @@ pub fn describe_alter_source( generate_extracted_config!( AlterSourceAddSubsourceOption, (TextColumns, Vec::, Default(vec![])), - (IgnoreColumns, Vec::, Default(vec![])), + (ExcludeColumns, Vec::, Default(vec![])), (Details, String) ); diff --git a/src/sql/src/plan/statement/show.rs b/src/sql/src/plan/statement/show.rs index eacf86ab29a55..179b6299c478c 100644 --- a/src/sql/src/plan/statement/show.rs +++ b/src/sql/src/plan/statement/show.rs @@ -1021,7 +1021,7 @@ fn humanize_sql_for_show_create( // COLUMNS` values that refer to the table it // ingests, which we'll handle below. MySqlConfigOptionName::TextColumns - | MySqlConfigOptionName::IgnoreColumns => {} + | MySqlConfigOptionName::ExcludeColumns => {} // Drop details, which does not rountrip. MySqlConfigOptionName::Details => return false, }; @@ -1036,13 +1036,13 @@ fn humanize_sql_for_show_create( curr_references.contains_key(&name) } _ => unreachable!( - "TEXT COLUMNS + IGNORE COLUMNS must be sequence of unresolved item names" + "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names" ), }); !seq_unresolved_item_names.is_empty() } _ => unreachable!( - "TEXT COLUMNS + IGNORE COLUMNS must be sequence of unresolved item names" + "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names" ), } }); @@ -1077,7 +1077,7 @@ fn humanize_sql_for_show_create( stmt.with_options.retain_mut(|o| { match o.name { CreateSubsourceOptionName::TextColumns => true, - CreateSubsourceOptionName::IgnoreColumns => true, + CreateSubsourceOptionName::ExcludeColumns => true, // Drop details, which does not rountrip. CreateSubsourceOptionName::Details => false, CreateSubsourceOptionName::ExternalReference => true, diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 0cefb319f9c71..9d1fda7845d46 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -262,7 +262,7 @@ pub enum PurifiedExportDetails { MySql { table: MySqlTableDesc, text_columns: Option>, - ignore_columns: Option>, + exclude_columns: Option>, initial_gtid_set: String, }, Postgres { @@ -886,7 +886,7 @@ async fn purify_create_source( let crate::plan::statement::ddl::MySqlConfigOptionExtracted { details, text_columns, - ignore_columns, + exclude_columns, seen: _, } = options.clone().try_into()?; @@ -949,12 +949,12 @@ async fn purify_create_source( let mysql::PurifiedSourceExports { source_exports: subsources, normalized_text_columns, - normalized_ignore_columns, + normalized_exclude_columns, } = mysql::purify_source_exports( &mut conn, external_references, text_columns, - ignore_columns, + exclude_columns, source_name, initial_gtid_set.clone(), &reference_policy, @@ -983,10 +983,10 @@ async fn purify_create_source( } if let Some(ignore_cols_option) = options .iter_mut() - .find(|option| option.name == MySqlConfigOptionName::IgnoreColumns) + .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns) { ignore_cols_option.value = - Some(WithOptionValue::Sequence(normalized_ignore_columns)); + Some(WithOptionValue::Sequence(normalized_exclude_columns)); } } CreateSourceConnection::LoadGenerator { generator, options } => { @@ -1194,7 +1194,7 @@ async fn purify_alter_source( let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted { text_columns, - ignore_columns, + exclude_columns, details, seen: _, } = options.clone().try_into()?; @@ -1230,9 +1230,9 @@ async fn purify_alter_source( Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?; } - if !ignore_columns.is_empty() { + if !exclude_columns.is_empty() { sql_bail!( - "{} is a {} source, which does not support IGNORE COLUMNS.", + "{} is a {} source, which does not support EXCLUDE COLUMNS.", scx.catalog.minimal_qualification(source_name), connection_name ) @@ -1287,12 +1287,12 @@ async fn purify_alter_source( let mysql::PurifiedSourceExports { source_exports: subsources, normalized_text_columns, - normalized_ignore_columns, + normalized_exclude_columns, } = mysql::purify_source_exports( &mut conn, &Some(ExternalReferences::SubsetTables(external_references)), text_columns, - ignore_columns, + exclude_columns, &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, @@ -1309,10 +1309,10 @@ async fn purify_alter_source( } if let Some(ignore_cols_option) = options .iter_mut() - .find(|option| option.name == AlterSourceAddSubsourceOptionName::IgnoreColumns) + .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns) { ignore_cols_option.value = - Some(WithOptionValue::Sequence(normalized_ignore_columns)); + Some(WithOptionValue::Sequence(normalized_exclude_columns)); } } _ => unreachable!(), @@ -1370,7 +1370,7 @@ async fn purify_create_table_from_source( let crate::plan::statement::ddl::TableFromSourceOptionExtracted { text_columns, - ignore_columns, + exclude_columns, details, seen: _, } = with_options.clone().try_into()?; @@ -1392,7 +1392,7 @@ async fn purify_create_table_from_source( ) }) .collect_vec(); - let qualified_ignore_columns = ignore_columns + let qualified_exclude_columns = exclude_columns .iter() .map(|col| { UnresolvedItemName( @@ -1450,9 +1450,9 @@ async fn purify_create_table_from_source( } // TODO(roshan): Add support for PG sources to allow this - if !ignore_columns.is_empty() { + if !exclude_columns.is_empty() { sql_bail!( - "{} is a {} source, which does not support IGNORE COLUMNS.", + "{} is a {} source, which does not support EXCLUDE COLUMNS.", scx.catalog.minimal_qualification(qualified_source_name), connection_name ) @@ -1503,16 +1503,16 @@ async fn purify_create_table_from_source( let mysql::PurifiedSourceExports { source_exports, - // `normalized_text/ignore_columns` is not relevant for us and is only returned for + // `normalized_text/exclude_columns` is not relevant for us and is only returned for // `CREATE SOURCE` statements that automatically generate subsources, and will be // removed with that functionality in the future. normalized_text_columns: _, - normalized_ignore_columns: _, + normalized_exclude_columns: _, } = mysql::purify_source_exports( &mut conn, &Some(ExternalReferences::SubsetTables(vec![requested_reference])), qualified_text_columns, - qualified_ignore_columns, + qualified_exclude_columns, &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, @@ -1605,7 +1605,7 @@ async fn purify_create_table_from_source( columns: gen_columns, constraints: gen_constraints, text_columns: gen_text_columns, - ignore_columns: gen_ignore_columns, + exclude_columns: gen_exclude_columns, details: gen_details, external_reference: _, } = mysql::generate_source_export_statement_values(&scx, purified_export)?; @@ -1624,10 +1624,10 @@ async fn purify_create_table_from_source( } if let Some(ignore_cols_option) = with_options .iter_mut() - .find(|option| option.name == TableFromSourceOptionName::IgnoreColumns) + .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns) { - if let Some(gen_ignore_columns) = gen_ignore_columns { - ignore_cols_option.value = Some(WithOptionValue::Sequence(gen_ignore_columns)); + if let Some(gen_exclude_columns) = gen_exclude_columns { + ignore_cols_option.value = Some(WithOptionValue::Sequence(gen_exclude_columns)); } else { soft_panic_or_log!( "text_columns should be Some if ignore_cols_option is present" diff --git a/src/sql/src/pure/error.rs b/src/sql/src/pure/error.rs index f8b0838f205ee..70d4341abbf39 100644 --- a/src/sql/src/pure/error.rs +++ b/src/sql/src/pure/error.rs @@ -316,7 +316,7 @@ impl MySqlSourcePurificationError { ), Self::UnrecognizedTypes { cols: _ } => Some( "Check the docs -- some types can be supported using the TEXT COLUMNS option to \ - ingest their values as text, or ignored using IGNORE COLUMNS." + ingest their values as text, or ignored using EXCLUDE COLUMNS." .into(), ), Self::EmptyDatabase => Some( diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index e8e176e823fbd..5712bfadd4c3b 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -72,7 +72,7 @@ pub fn generate_create_subsource_statements( columns, constraints, text_columns, - ignore_columns, + exclude_columns, details, external_reference, } = generate_source_export_statement_values(scx, purified_export)?; @@ -97,10 +97,10 @@ pub fn generate_create_subsource_statements( }); } - if let Some(ignore_columns) = ignore_columns { + if let Some(exclude_columns) = exclude_columns { with_options.push(CreateSubsourceOption { - name: CreateSubsourceOptionName::IgnoreColumns, - value: Some(WithOptionValue::Sequence(ignore_columns)), + name: CreateSubsourceOptionName::ExcludeColumns, + value: Some(WithOptionValue::Sequence(exclude_columns)), }); } @@ -123,7 +123,7 @@ pub(super) struct MySqlExportStatementValues { pub(super) columns: Vec>, pub(super) constraints: Vec>, pub(super) text_columns: Option>>, - pub(super) ignore_columns: Option>>, + pub(super) exclude_columns: Option>>, pub(super) details: SourceExportStatementDetails, pub(super) external_reference: UnresolvedItemName, } @@ -135,7 +135,7 @@ pub(super) fn generate_source_export_statement_values( let PurifiedExportDetails::MySql { table, text_columns, - ignore_columns, + exclude_columns, initial_gtid_set, } = purified_export.details else { @@ -205,7 +205,7 @@ pub(super) fn generate_source_export_statement_values( .collect() }); - let ignore_columns = ignore_columns.map(|mut columns| { + let exclude_columns = exclude_columns.map(|mut columns| { columns.sort(); columns .into_iter() @@ -216,7 +216,7 @@ pub(super) fn generate_source_export_statement_values( columns, constraints, text_columns, - ignore_columns, + exclude_columns, details, external_reference: purified_export.external_reference, }) @@ -324,14 +324,14 @@ pub(super) async fn validate_requested_references_privileges( pub(super) struct PurifiedSourceExports { /// map of source export names to the details of the export pub(super) source_exports: BTreeMap, - // NOTE(roshan): The text columns and ignore columns are already part of their + // NOTE(roshan): The text columns and exclude columns are already part of their // appropriate `source_exports` above, but these are returned to allow // round-tripping a `CREATE SOURCE` statement while we still allow creating // implicit subsources from `CREATE SOURCE`. Remove once // fully deprecating that feature and forcing users to use explicit // `CREATE TABLE .. FROM SOURCE` statements. pub(super) normalized_text_columns: Vec>, - pub(super) normalized_ignore_columns: Vec>, + pub(super) normalized_exclude_columns: Vec>, } // Purify the requested external references, returning a set of purified @@ -341,7 +341,7 @@ pub(super) async fn purify_source_exports( conn: &mut mz_mysql_util::MySqlConn, external_references: &Option, text_columns: Vec, - ignore_columns: Vec, + exclude_columns: Vec, unresolved_source_name: &UnresolvedItemName, initial_gtid_set: String, reference_policy: &SourceReferencePolicy, @@ -376,13 +376,14 @@ pub(super) async fn purify_source_exports( return Ok(PurifiedSourceExports { source_exports: BTreeMap::new(), normalized_text_columns: vec![], - normalized_ignore_columns: vec![], + normalized_exclude_columns: vec![], }); } }; let text_cols_map = map_column_refs(&text_columns, MySqlConfigOptionName::TextColumns)?; - let ignore_cols_map = map_column_refs(&ignore_columns, MySqlConfigOptionName::IgnoreColumns)?; + let exclude_columns_map = + map_column_refs(&exclude_columns, MySqlConfigOptionName::ExcludeColumns)?; // Retrieve schemas for all requested tables // NOTE: mysql will only expose the schemas of tables we have at least one privilege on @@ -391,15 +392,15 @@ pub(super) async fn purify_source_exports( let raw_tables = mz_mysql_util::schema_info(conn.deref_mut(), &table_schema_request).await?; // Convert the raw tables into a format that we can use to generate source exports - // using any applicable text_columns and ignore_columns. + // using any applicable text_columns and exclude_columns. let mut tables = Vec::with_capacity(raw_tables.len()); for table in raw_tables.into_iter() { let table_ref = table.table_ref(); // we are cloning the BTreeSet<&str> so we can avoid a borrow on `table` here let text_cols = text_cols_map.get(&table_ref).map(|s| s.clone()); - let ignore_cols = ignore_cols_map.get(&table_ref).map(|s| s.clone()); + let exclude_columns = exclude_columns_map.get(&table_ref).map(|s| s.clone()); let parsed_table = table - .to_desc(text_cols.as_ref(), ignore_cols.as_ref()) + .to_desc(text_cols.as_ref(), exclude_columns.as_ref()) .map_err(|err| match err { mz_mysql_util::MySqlError::UnsupportedDataTypes { columns } => { PlanError::from(MySqlSourcePurificationError::UnrecognizedTypes { @@ -515,7 +516,7 @@ pub(super) async fn purify_source_exports( .map(|c| Ident::new(*c).expect("validated above")) .collect() }), - ignore_columns: ignore_cols_map.get(&table_ref).map(|cols| { + exclude_columns: exclude_columns_map.get(&table_ref).map(|cols| { cols.iter() .map(|c| Ident::new(*c).expect("validated above")) .collect() @@ -531,8 +532,8 @@ pub(super) async fn purify_source_exports( source_exports, // Normalize column options and remove unused column references. normalized_text_columns: normalize_column_refs(text_columns, &reference_resolver, &tables)?, - normalized_ignore_columns: normalize_column_refs( - ignore_columns, + normalized_exclude_columns: normalize_column_refs( + exclude_columns, &reference_resolver, &tables, )?, diff --git a/src/storage-types/src/sources/mysql.proto b/src/storage-types/src/sources/mysql.proto index 8a96b96cc6774..219efd7f8b808 100644 --- a/src/storage-types/src/sources/mysql.proto +++ b/src/storage-types/src/sources/mysql.proto @@ -30,7 +30,7 @@ message ProtoMySqlSourceExportDetails { mz_mysql_util.ProtoMySqlTableDesc table = 1; string initial_gtid_set = 2; repeated string text_columns = 3; - repeated string ignore_columns = 4; + repeated string exclude_columns = 4; } // NOTE: this message is encoded and stored as part of source export diff --git a/src/storage-types/src/sources/mysql.rs b/src/storage-types/src/sources/mysql.rs index f5ea4f9fdc9c8..0c857433e96c9 100644 --- a/src/storage-types/src/sources/mysql.rs +++ b/src/storage-types/src/sources/mysql.rs @@ -240,7 +240,7 @@ pub struct MySqlSourceExportDetails { #[proptest(strategy = "any_gtidset()")] pub initial_gtid_set: String, pub text_columns: Vec, - pub ignore_columns: Vec, + pub exclude_columns: Vec, } impl RustType for MySqlSourceExportDetails { @@ -249,7 +249,7 @@ impl RustType for MySqlSourceExportDetails { table: Some(self.table.into_proto()), initial_gtid_set: self.initial_gtid_set.clone(), text_columns: self.text_columns.clone(), - ignore_columns: self.ignore_columns.clone(), + exclude_columns: self.exclude_columns.clone(), } } @@ -260,7 +260,7 @@ impl RustType for MySqlSourceExportDetails { .into_rust_if_some("ProtoMySqlSourceExportDetails::table")?, initial_gtid_set: proto.initial_gtid_set, text_columns: proto.text_columns, - ignore_columns: proto.ignore_columns, + exclude_columns: proto.exclude_columns, }) } } @@ -277,7 +277,7 @@ impl AlterCompatible for MySqlSourceExportDetails { table: _, initial_gtid_set: _, text_columns: _, - ignore_columns: _, + exclude_columns: _, } = self; Ok(()) } diff --git a/src/storage/src/source/mysql.rs b/src/storage/src/source/mysql.rs index aa34942a99626..600147fdbbc93 100644 --- a/src/storage/src/source/mysql.rs +++ b/src/storage/src/source/mysql.rs @@ -144,7 +144,7 @@ impl SourceRender for MySqlSourceConnection { output_index: *ingestion_output, desc, text_columns: details.text_columns.clone(), - ignore_columns: details.ignore_columns.clone(), + exclude_columns: details.exclude_columns.clone(), initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"), resume_upper, }); @@ -225,7 +225,7 @@ struct SourceOutputInfo { table_name: MySqlTableName, desc: MySqlTableDesc, text_columns: Vec, - ignore_columns: Vec, + exclude_columns: Vec, initial_gtid_set: Antichain, resume_upper: Antichain, } diff --git a/src/storage/src/source/mysql/schemas.rs b/src/storage/src/source/mysql/schemas.rs index baebf02ef4d9d..81693980cb301 100644 --- a/src/storage/src/source/mysql/schemas.rs +++ b/src/storage/src/source/mysql/schemas.rs @@ -60,7 +60,7 @@ where output.text_columns.iter().map(|s| s.as_str()), )), Some(&BTreeSet::from_iter( - output.ignore_columns.iter().map(|s| s.as_str()), + output.exclude_columns.iter().map(|s| s.as_str()), )), ); match new_desc { diff --git a/test/mysql-cdc/35-ignore-columns.td b/test/mysql-cdc/35-exclude-columns.td similarity index 87% rename from test/mysql-cdc/35-ignore-columns.td rename to test/mysql-cdc/35-exclude-columns.td index c0237b6a19054..6f13bfb961127 100644 --- a/test/mysql-cdc/35-ignore-columns.td +++ b/test/mysql-cdc/35-exclude-columns.td @@ -37,14 +37,14 @@ contains: unsupported type ! CREATE SOURCE da FROM MYSQL CONNECTION mysqc ( - IGNORE COLUMNS (t1.f2, t1.f3) + EXCLUDE COLUMNS (t1.f2, t1.f3) ) FOR TABLES (public.t1); -contains:invalid IGNORE COLUMNS option value: column name 't1.f2' must have at least a table qualification +contains:invalid EXCLUDE COLUMNS option value: column name 't1.f2' must have at least a table qualification > CREATE SOURCE da FROM MYSQL CONNECTION mysqc ( - IGNORE COLUMNS (public.t1.f2, public.t1.f3) + EXCLUDE COLUMNS (public.t1.f2, public.t1.f3) ) FOR TABLES (public.t1); @@ -62,7 +62,7 @@ INSERT INTO t1 SELECT * FROM t1; "test" > SHOW CREATE SOURCE t1; -materialize.public.t1 "CREATE SUBSOURCE \"materialize\".\"public\".\"t1\" (\"f1\" \"pg_catalog\".\"int4\", \"f4\" \"pg_catalog\".\"varchar\"(64)) OF SOURCE \"materialize\".\"public\".\"da\" WITH (EXTERNAL REFERENCE = \"public\".\"t1\", IGNORE COLUMNS = (\"f2\", \"f3\"))" +materialize.public.t1 "CREATE SUBSOURCE \"materialize\".\"public\".\"t1\" (\"f1\" \"pg_catalog\".\"int4\", \"f4\" \"pg_catalog\".\"varchar\"(64)) OF SOURCE \"materialize\".\"public\".\"da\" WITH (EXTERNAL REFERENCE = \"public\".\"t1\", EXCLUDE COLUMNS = (\"f2\", \"f3\"))" ! SELECT f2 FROM t1; contains:column "f2" does not exist diff --git a/test/mysql-cdc/alter-source.td b/test/mysql-cdc/alter-source.td index ea4d3c16afb12..4af30d4436688 100644 --- a/test/mysql-cdc/alter-source.td +++ b/test/mysql-cdc/alter-source.td @@ -395,27 +395,27 @@ detail:the following columns are referenced but not added: public.table_f.f2 # -# Can add tables with ignore columns +# Can add tables with exclude columns # -! ALTER SOURCE mz_source ADD SUBSOURCE public.table_f WITH (IGNORE COLUMNS [public.table_f.f2, public.table_f.f2]); -contains: invalid IGNORE COLUMNS option value: unexpected multiple references to public.table_f.f2 +! ALTER SOURCE mz_source ADD SUBSOURCE public.table_f WITH (EXCLUDE COLUMNS [public.table_f.f2, public.table_f.f2]); +contains: invalid EXCLUDE COLUMNS option value: unexpected multiple references to public.table_f.f2 -> ALTER SOURCE mz_source ADD SUBSOURCE public.table_f WITH (IGNORE COLUMNS [public.table_f.f2]); +> ALTER SOURCE mz_source ADD SUBSOURCE public.table_f WITH (EXCLUDE COLUMNS [public.table_f.f2]); > SELECT * FROM table_f 1 2 3 -> SELECT regexp_match(create_sql, 'IGNORE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source); +> SELECT regexp_match(create_sql, 'EXCLUDE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source); "\"public\".\"table_f\".\"f2\"" -> ALTER SOURCE mz_source ADD SUBSOURCE public.table_i WITH (IGNORE COLUMNS [public.table_i.f2]); +> ALTER SOURCE mz_source ADD SUBSOURCE public.table_i WITH (EXCLUDE COLUMNS [public.table_i.f2]); -> SELECT regexp_match(create_sql, 'IGNORE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source); +> SELECT regexp_match(create_sql, 'EXCLUDE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source); "\"public\".\"table_f\".\"f2\", \"public\".\"table_i\".\"f2\"" -> SELECT regexp_match(create_sql, 'IGNORE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE table_i); +> SELECT regexp_match(create_sql, 'EXCLUDE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE table_i); "\"f2\"" > SELECT * FROM table_i @@ -424,28 +424,28 @@ contains: invalid IGNORE COLUMNS option value: unexpected multiple references to > DROP SOURCE table_f, table_i; -> SELECT regexp_match(create_sql, 'IGNORE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source); +> SELECT regexp_match(create_sql, 'EXCLUDE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source); -! ALTER SOURCE mz_source ADD SUBSOURCE public.table_e WITH (IGNORE COLUMNS (public.table_z.a)); +! ALTER SOURCE mz_source ADD SUBSOURCE public.table_e WITH (EXCLUDE COLUMNS (public.table_z.a)); contains:Reference to columns not currently being added -! ALTER SOURCE mz_source ADD SUBSOURCE public.table_e WITH (IGNORE COLUMNS [public.table_f.f2]); +! ALTER SOURCE mz_source ADD SUBSOURCE public.table_e WITH (EXCLUDE COLUMNS [public.table_f.f2]); contains:Reference to columns not currently being added detail:the following columns are referenced but not added: public.table_f.f2 -# Test adding ignore cols w/o original IGNORE COLUMNS +# Test adding exclude cols w/o original EXCLUDE COLUMNS -> CREATE SOURCE "mz_source_wo_init_ignore_cols" +> CREATE SOURCE "mz_source_wo_init_exclude_cols" FROM MYSQL CONNECTION mysql_conn FOR TABLES (public.table_a AS t_a2); -> SELECT regexp_match(create_sql, 'IGNORE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source_wo_init_ignore_cols); +> SELECT regexp_match(create_sql, 'EXCLUDE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source_wo_init_exclude_cols); -> ALTER SOURCE mz_source_wo_init_ignore_cols ADD SUBSOURCE public.table_f AS t_f2 WITH (IGNORE COLUMNS [public.table_f.f2]); +> ALTER SOURCE mz_source_wo_init_exclude_cols ADD SUBSOURCE public.table_f AS t_f2 WITH (EXCLUDE COLUMNS [public.table_f.f2]); -> SELECT regexp_match(create_sql, 'IGNORE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source_wo_init_ignore_cols); +> SELECT regexp_match(create_sql, 'EXCLUDE COLUMNS = \((.*?)\)')[1] FROM (SHOW CREATE SOURCE mz_source_wo_init_exclude_cols); "\"public\".\"table_f\".\"f2\"" # Add a table after having created the source