From 66a76ac9ef6263f31e4975d8acd606ece98f1dec Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Tue, 6 Aug 2024 09:57:25 -0400 Subject: [PATCH] More details on implementation plan --- ...5_source_versioning__table_from_sources.md | 132 +++++++++++------- 1 file changed, 81 insertions(+), 51 deletions(-) diff --git a/doc/developer/design/20240625_source_versioning__table_from_sources.md b/doc/developer/design/20240625_source_versioning__table_from_sources.md index 79e7c584ac347..09750ac042bfe 100644 --- a/doc/developer/design/20240625_source_versioning__table_from_sources.md +++ b/doc/developer/design/20240625_source_versioning__table_from_sources.md @@ -105,17 +105,20 @@ existing subsources to be tables without needing to figure out how to merge The default `CREATE SOURCE ..` statement will just create a top-level source object that represents a single ingestion of data from an upstream system. To actually ingest data from that upstream system into a persist collection, the user will -use a `CREATE TABLE .. ` statement with an option that references the source +use a `CREATE TABLE .. FROM SOURCE` statement with an option that references the source and the external reference. We will allow more than one table to reference the same external upstream reference, using a potentially different set of columns. This allows a user to handle an upstream schema change by creating a new table for the same upstream table and then performing a blue-green swap operation to switch their downstream dependencies to the new table, -and then drop the old one. +and then drop the old one. While we could also enable multiple subsources to +reference the same external upstream reference, we will instead continue to restrict this +to provide an incentive for users to opt-in to the migration to tables and allow us +to deprecate subsources sooner. The existing options to `CREATE SOURCE` for Postgres & MySQL sources that specify -which tables to ingest (`FOR ALL TABLES`, `FOR SCHEMAS` and `FOR TABLES`) will be +automatic subsources to create (`FOR ALL TABLES`, `FOR SCHEMAS` and `FOR TABLES`) will be removed. Instead, all upstream tables to ingest from this source will need to be explicitly referenced in a `CREATE TABLE` statement. While this may seem controversial, more often than not these options cause upstream data that does not need to be @@ -123,45 +126,60 @@ brought into Materialize to be ingested, and by using explicit statements for ea table to be ingested it makes Materialize configuration much more amenable to object<->state mappings in tools like dbt and Terraform. -We can eventually reintroduce syntactic sugar to perform a similar function to -`FOR ALL TABLES` and `FOR SCHEMAS` if necessary, or introduce a new SQL command -that returns all known upstream tables and columns for a source. +We will instead introduce a SQL statement to return all known upstream tables +similar for a given source, optionally filtered by schema(s). This will allow +the web console to expose a guided 'create source' workflow for users where +they can select from a list of upstream tables and the console can generate +the appropriate `CREATE TABLE .. FROM SOURCE` statements for those selected. ### Implementation Plan 1. Separate the planning of _sources_ and _subsources_ such that sources are fully - planned before any linked subsources. Part of this work has been prototyped in: - https://github.com/MaterializeInc/materialize/pull/27320 - Update: completed in https://github.com/MaterializeInc/materialize/pull/28310 + planned before any linked subsources. + **Update: completed in [PR](https://github.com/MaterializeInc/materialize/pull/28310)** 2. Update the CREATE TABLE statement to allow creation of a read-only table with a reference to a source and an upstream-reference: - (`CREATE TABLE FROM SOURCE (upstream_reference)`) - Update: statement introduced in https://github.com/MaterializeInc/materialize/pull/28125 + (`CREATE TABLE FROM SOURCE (REFERENCE )`) + **Update: statement introduced in [PR](https://github.com/MaterializeInc/materialize/pull/28125)** -3. Update the underlying `SourceDesc`/`IngestionDesc` and +3. Copy subsource-specific details stored on `CREATE SOURCE` statement options to their + relevant subsource statements. + **Update: completed in [PR](https://github.com/MaterializeInc/materialize/pull/28493)** + +4. Update the underlying `SourceDesc`/`IngestionDesc` and `SourceExport`/`IngestionExport` structs to include each export's specific details and options on its own struct, rather than using an implicit mapping into the top-level source's options. - This may involve moving options such as `TEXT COLUMNS` to be stored on subsource - statements rather than top-level source statements instead. + **Update: done in [PR](https://github.com/MaterializeInc/materialize/pull/28503)** + +5. Update the source rendering operators to handle the new structures and allow + outputting the same upstream table to more than one souce export. + **Update: open PRs: [MySQL](https://github.com/MaterializeInc/materialize/pull/28671) [Postgres](https://github.com/MaterializeInc/materialize/pull/28676)** -4. Implement planning for `CREATE TABLE .. FROM SOURCE` to include a purification +6. Implement planning for `CREATE TABLE .. FROM SOURCE` and include a purification step akin to the purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify the upstream permissions, schema, etc for the newly added source-fed table. + **Update: open PRs [purification](https://github.com/MaterializeInc/materialize/pull/28943) and [planning](https://github.com/MaterializeInc/materialize/pull/28954)** -5. Update the storage controller to use both _subsources_ and _read-only tables_ +7. Update the storage controller to use both _subsources_ and _read-only tables_ as _source_exports_ for existing multi-output sources (postgres & mysql). -6. Update the source rendering operators to handle the new structures and allow - outputting the same upstream table to more than one souce export. +8. Update introspection tables to expose details of source-fed tables + (`CREATE TABLE .. FROM SOURCE` tables) + +9. Add a new SQL command that returns all possible known upstream tables and columns for a source. -7. Migrate existing sources to the new source model (make all sources 'multi-output' sources) - and preserve the original names tied to each collection such that downstream object - references don't need to change. +10. Implement an opt-in migration using a feature-flag to convert subsources to tables + for existing multi-output sources (Postgres, MySQL, Load Generators) -8. Remove subsource purification logic from `purify_create_source`, subsource statement parsing, - and related planning code. +11. Restructure kafka source planning and rendering to use source_export structure and allow + multiple source-exports for a given kafka topic + +12. Implement an opt-in migration for kafka sources to be converted to table structure + +13. Remove subsource purification logic from `purify_create_source`, subsource statement parsing, + and related planning code. ### Core Implementation Details @@ -182,38 +200,29 @@ CREATE TABLE () WITH () ``` We would update the `CREATE TABLE` statement to be able to optionally reference an upstream -source and reference using `FROM SOURCE (REFERENCE )`: +source and reference using `FROM SOURCE (REFERENCE )` and +any additional options necessary for ingesting the upstrema data: ```sql -CREATE TABLE () FROM SOURCE (REFERENCE = ) +CREATE TABLE FROM SOURCE (REFERENCE = ) WITH (TEXT COLUMNS = (..), ..) ``` -`` can be optionally specified by the user to request a subset of the upstream table's -columns, but will not be permitted to include user-specified column types, since these will -be determined by the upstream source details. - A new `CreateTableFromSource` statement will be introduced that includes the source reference `T::ItemName` and the external reference `UnresolvedItemName`. We would then introduce a new `TableDataSource` enum and add a field to the `Table` objects used for the in-memory catalog and SQL planning, that optionally includes details -on a per-source basis in an `ExportDetails` field: +of an upstream data source in the `DataSource` variant, which will contain the existing +`DataSourceDesc` structs used in sources: ```rust pub enum TableDataSource { /// The table owns data created via INSERT/UPDATE/DELETE statements. TableWrites, - /// The table receives its data from the identified ingestion, specifically - /// the upstream object identified by `external_reference`. The `details` - /// field contains any options necessary to map the upstream relation to this - /// relation. + /// The table receives its data from the identified data source. /// This table type does not support INSERT/UPDATE/DELETE statements. - IngestionExport { - ingestion_id: GlobalId, - external_reference: UnresolvedItemName, - details: ExportDetails, - } + DataSource(DataSourceDesc) } pub struct Table { @@ -222,9 +231,31 @@ pub struct Table { ... pub data_source: TableDataSource } +``` + +The `DataSourceDesc` struct's `IngestionExport` variant will be extended to +include a `details: SourceExportDetails` field on each export, such that all +the individual details necessary to render that export are stored here and +not on the top-level `Ingestion`: + +```rust +pub enum DataSourceDesc { + /// Receives data from an external system. + Ingestion(Ingestion), + /// This source receives its data from the identified ingestion, + /// specifically the output identified by `external_reference`. + IngestionExport { + ingestion_id: GlobalId, + external_reference: UnresolvedItemName, + details: SourceExportDetails, + }, + + ... +} + /// this is an example and these enum structs would likely be their own types -pub enum ExportDetails { +pub enum SourceExportDetails { Kafka, Postgres { column_casts: ..., @@ -242,11 +273,12 @@ pub enum ExportDetails { ``` The planning for `CREATE TABLE` will be adjusted to include a purification step -akin to the purification in `purify_alter_source`. This will also map any -specified columns into a projection of the upstream table's column order. -This will also verify the upstream permissions, schema, etc of the upstream table, -and generate a new `details` for the top-level source, that is merged with the -top-level source during sequencing (how alter source works now). +akin to the purification in `purify_alter_source`. +This will verify the upstream permissions, schema, etc of the upstream table, and +create the `details` necessary for this new export to be rendered as part of the +source. It will not need to be merged with the top-level source during sequencing +since its own details are entirely self-contained, unlike existing `CREATE SUBSOURCE` +statements. #### Storage Collection Coordination @@ -306,13 +338,11 @@ Then inside each source implementation, we currently assume that each corresponds to the same index of the upstream table in the source's upstream `details`. Since the top-level source `details` will no longer contain the `tables` field, the -output index will be determined by the ordering of the `IngestionDescription::source_exports` `BTreeMap`. Each `SourceExport` will output its own stream from the ingestion using +output index will be determined by the ordering of the `IngestionDescription::source_exports` `BTreeMap`. +Each `SourceExport` will output its own stream from the ingestion using the `details` it contains. It will be up to each source implementation to map the -relevant upstream table to the correct `SourceExport`s using their `external_reference`. - -The `build_ingestion_dataflow` method then demuxes the output collection of -each source by each `output_index` and pushes each data stream to the appropriate -`persist_sink`. +relevant upstream table to the correct `SourceExport`s using the `SourceExportDetails` +and output to the correct `output_index`. #### Migration of source statements and collections