From 62c8543eb21d20a683a2bbd799ecc4f6db9fe2ae Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Mon, 22 Jul 2024 11:14:38 -0400 Subject: [PATCH] Update implementation details based on recent work and learnings --- ...5_source_versioning__table_from_sources.md | 94 +++++++++++-------- 1 file changed, 56 insertions(+), 38 deletions(-) diff --git a/doc/developer/design/20240625_source_versioning__table_from_sources.md b/doc/developer/design/20240625_source_versioning__table_from_sources.md index b3065314ffe70..79e7c584ac347 100644 --- a/doc/developer/design/20240625_source_versioning__table_from_sources.md +++ b/doc/developer/design/20240625_source_versioning__table_from_sources.md @@ -132,33 +132,35 @@ that returns all known upstream tables and columns for a source. 1. Separate the planning of _sources_ and _subsources_ such that sources are fully planned before any linked subsources. Part of this work has been prototyped in: https://github.com/MaterializeInc/materialize/pull/27320 + Update: completed in https://github.com/MaterializeInc/materialize/pull/28310 -2. Introduce the concept of _read-only tables_ that link to a source ingestion - and an upstream 'reference' specifying which upstream table to ingest into this table. - -3. Update the CREATE TABLE statement to allow creation of a read-only table +2. Update the CREATE TABLE statement to allow creation of a read-only table with a reference to a source and an upstream-reference: (`CREATE TABLE FROM SOURCE (upstream_reference)`) + Update: statement introduced in https://github.com/MaterializeInc/materialize/pull/28125 -4. Update planning for `CREATE TABLE` to include a purification step akin to the - purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify the upstream - permissions, schema, etc for the newly added source-fed table. +3. Update the underlying `SourceDesc`/`IngestionDesc` and + `SourceExport`/`IngestionExport` structs to include each export's specific + details and options on its own struct, rather than using an implicit mapping into + the top-level source's options. + This may involve moving options such as `TEXT COLUMNS` to be stored on subsource + statements rather than top-level source statements instead. -5. Update the underlying _source_ (ingestion) and source*export/ingestion_export - structures to allow more than one \_source_export* to refer to an upstream - reference, and each _source_export_ to have its own projection of the upstream data +4. Implement planning for `CREATE TABLE .. FROM SOURCE` to include a purification + step akin to the purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify + the upstream permissions, schema, etc for the newly added source-fed table. -6. Update the storage controller to use both _subsources_ and _read-only tables_ +5. Update the storage controller to use both _subsources_ and _read-only tables_ as _source_exports_ for existing multi-output sources (postgres & mysql). -7. Update the source rendering operators to handle the new structures and allow +6. Update the source rendering operators to handle the new structures and allow outputting the same upstream table to more than one souce export. -8. Migrate existing sources to the new source model (make all sources 'multi-output' sources) +7. Migrate existing sources to the new source model (make all sources 'multi-output' sources) and preserve the original names tied to each collection such that downstream object references don't need to change. -9. Remove subsource purification logic from `purify_create_source`, subsource statement parsing, +8. Remove subsource purification logic from `purify_create_source`, subsource statement parsing, and related planning code. ### Core Implementation Details @@ -194,7 +196,8 @@ A new `CreateTableFromSource` statement will be introduced that includes the sou `T::ItemName` and the external reference `UnresolvedItemName`. We would then introduce a new `TableDataSource` enum and add a field to the `Table` objects -used for the in-memory catalog and SQL planning: +used for the in-memory catalog and SQL planning, that optionally includes details +on a per-source basis in an `ExportDetails` field: ```rust pub enum TableDataSource { @@ -202,13 +205,14 @@ pub enum TableDataSource { TableWrites, /// The table receives its data from the identified ingestion, specifically - /// the output identified by `external_reference`, using the projection - /// `external_projection` to map the upstream relation to this relation. + /// the upstream object identified by `external_reference`. The `details` + /// field contains any options necessary to map the upstream relation to this + /// relation. /// This table type does not support INSERT/UPDATE/DELETE statements. IngestionExport { ingestion_id: GlobalId, external_reference: UnresolvedItemName, - external_projection: Vec, + details: ExportDetails, } } @@ -218,6 +222,23 @@ pub struct Table { ... pub data_source: TableDataSource } + +/// this is an example and these enum structs would likely be their own types +pub enum ExportDetails { + Kafka, + Postgres { + column_casts: ..., + table: PostgresTableDesc, + }, + MySql { + table: MySqlTableDesc, + text_columns: Vec, + ignore_columns: Vec, + }, + LoadGenerator { + ... + } +} ``` The planning for `CREATE TABLE` will be adjusted to include a purification step @@ -235,17 +256,13 @@ a storage controller `CollectionDescription` with a `DataSource::IngestionExport rather than the existing `DataSourceOther::TableWrites` value used for all tables. The storage controller's `DataSource::IngestionExport` struct will be updated to -include an optional projection, populated based on the table projection mentioned above: +include the export details: ```rust IngestionExport { ingestion_id: GlobalId, - // This is an `UnresolvedItemName` because the structure works for PG, - // MySQL, and load generator sources. However, in the future, it should - // be sufficiently genericized to support all multi-output sources we - // support. external_reference: UnresolvedItemName, - external_projection: Option>, + details: ExportDetails, }, ``` @@ -255,8 +272,8 @@ inserts this into the appropriate `IngestionDescription` of the top-level source. This is used in source rendering to figure out what upstream reference should be mapped to a specific output collection. -The storage `SourceExport` struct will be updated to include an optional -projection field, populated from the `IngestionExport`'s `external_projection`: +The storage `SourceExport` struct will be updated to include the `details` +field, populated from the `IngestionExport`'s `details`: ```rust pub struct SourceExport { @@ -265,10 +282,14 @@ pub struct SourceExport { /// The collection metadata needed to write the exported data pub storage_metadata: S, /// How to project the ingestion output to this export's collection. - pub output_projection: Option>, + pub details: ExportDetails, } ``` +Then the subsource-specific values currently stored in `GenericSourceConnection` +such as the Postgres and MySQL `tables` vecs will be removed, since each +'source export' would contain its own information. + #### Source Rendering Currently, when an `IngestionDescription` is used inside `build_ingestion_dataflow` @@ -277,24 +298,21 @@ it determines the `output_index` of each collection to be output by the ingestio by calling the `IngestionDescription::source_exports_with_output_indices` method. This method maps the `external_reference` (the upstream table name) of each `SourceExport` -to the index of that upstream table in the source's `details` struct (which is created -during purification -- its a vector of upstream table descriptions). +to the index of that upstream table in the source's `details` struct (which is currently +created during purification -- a vector of upstream table descriptions). Then inside each source implementation, we currently assume that each `SourceExport` uniqely corresponds to a single `output_index`, and that each `output_index` corresponds to the same index of the upstream table in the source's upstream `details`. -This assumption is part of what prevents an upstream table from being ingested into -more than one collection. - -We would update each source implementation to assume that more than one `SourceExport` -can share an `output_index` and we would update the `source_exports` map created -by `source_exports_with_output_indices` to include the `details` index explicitly. +Since the top-level source `details` will no longer contain the `tables` field, the +output index will be determined by the ordering of the `IngestionDescription::source_exports` `BTreeMap`. Each `SourceExport` will output its own stream from the ingestion using +the `details` it contains. It will be up to each source implementation to map the +relevant upstream table to the correct `SourceExport`s using their `external_reference`. -The `build_ingestion_dataflow` method currently demuxes the output collection of +The `build_ingestion_dataflow` method then demuxes the output collection of each source by each `output_index` and pushes each data stream to the appropriate -`persist_sink`. We would update this method so that it applies the `output_projection` -for each `SourceExport` on each data stream before being pushed to its `persist_sink`. +`persist_sink`. #### Migration of source statements and collections