Skip to content

Commit

Permalink
More details on implementation plan
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Aug 13, 2024
1 parent 8593965 commit 66a76ac
Showing 1 changed file with 81 additions and 51 deletions.
132 changes: 81 additions & 51 deletions doc/developer/design/20240625_source_versioning__table_from_sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,63 +105,81 @@ existing subsources to be tables without needing to figure out how to merge
The default `CREATE SOURCE ..` statement will just create a top-level source object
that represents a single ingestion of data from an upstream system. To actually
ingest data from that upstream system into a persist collection, the user will
use a `CREATE TABLE .. ` statement with an option that references the source
use a `CREATE TABLE .. FROM SOURCE` statement with an option that references the source
and the external reference.

We will allow more than one table to reference the same external upstream reference,
using a potentially different set of columns. This allows a user to handle an upstream
schema change by creating a new table for the same upstream table and then performing
a blue-green swap operation to switch their downstream dependencies to the new table,
and then drop the old one.
and then drop the old one. While we could also enable multiple subsources to
reference the same external upstream reference, we will instead continue to restrict this
to provide an incentive for users to opt-in to the migration to tables and allow us
to deprecate subsources sooner.

The existing options to `CREATE SOURCE` for Postgres & MySQL sources that specify
which tables to ingest (`FOR ALL TABLES`, `FOR SCHEMAS` and `FOR TABLES`) will be
automatic subsources to create (`FOR ALL TABLES`, `FOR SCHEMAS` and `FOR TABLES`) will be
removed. Instead, all upstream tables to ingest from this source will need to be
explicitly referenced in a `CREATE TABLE` statement. While this may seem controversial,
more often than not these options cause upstream data that does not need to be
brought into Materialize to be ingested, and by using explicit statements for each
table to be ingested it makes Materialize configuration much more amenable to
object<->state mappings in tools like dbt and Terraform.

We can eventually reintroduce syntactic sugar to perform a similar function to
`FOR ALL TABLES` and `FOR SCHEMAS` if necessary, or introduce a new SQL command
that returns all known upstream tables and columns for a source.
We will instead introduce a SQL statement to return all known upstream tables
similar for a given source, optionally filtered by schema(s). This will allow
the web console to expose a guided 'create source' workflow for users where
they can select from a list of upstream tables and the console can generate
the appropriate `CREATE TABLE .. FROM SOURCE` statements for those selected.

### Implementation Plan

1. Separate the planning of _sources_ and _subsources_ such that sources are fully
planned before any linked subsources. Part of this work has been prototyped in:
https://github.com/MaterializeInc/materialize/pull/27320
Update: completed in https://github.com/MaterializeInc/materialize/pull/28310
planned before any linked subsources.
**Update: completed in [PR](https://github.com/MaterializeInc/materialize/pull/28310)**

2. Update the CREATE TABLE statement to allow creation of a read-only table
with a reference to a source and an upstream-reference:
(`CREATE TABLE <new_table> FROM SOURCE <source> (upstream_reference)`)
Update: statement introduced in https://github.com/MaterializeInc/materialize/pull/28125
(`CREATE TABLE <new_table> FROM SOURCE <source> (REFERENCE <upstream_reference>)`)
**Update: statement introduced in [PR](https://github.com/MaterializeInc/materialize/pull/28125)**

3. Update the underlying `SourceDesc`/`IngestionDesc` and
3. Copy subsource-specific details stored on `CREATE SOURCE` statement options to their
relevant subsource statements.
**Update: completed in [PR](https://github.com/MaterializeInc/materialize/pull/28493)**

4. Update the underlying `SourceDesc`/`IngestionDesc` and
`SourceExport`/`IngestionExport` structs to include each export's specific
details and options on its own struct, rather than using an implicit mapping into
the top-level source's options.
This may involve moving options such as `TEXT COLUMNS` to be stored on subsource
statements rather than top-level source statements instead.
**Update: done in [PR](https://github.com/MaterializeInc/materialize/pull/28503)**

5. Update the source rendering operators to handle the new structures and allow
outputting the same upstream table to more than one souce export.
**Update: open PRs: [MySQL](https://github.com/MaterializeInc/materialize/pull/28671) [Postgres](https://github.com/MaterializeInc/materialize/pull/28676)**

4. Implement planning for `CREATE TABLE .. FROM SOURCE` to include a purification
6. Implement planning for `CREATE TABLE .. FROM SOURCE` and include a purification
step akin to the purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify
the upstream permissions, schema, etc for the newly added source-fed table.
**Update: open PRs [purification](https://github.com/MaterializeInc/materialize/pull/28943) and [planning](https://github.com/MaterializeInc/materialize/pull/28954)**

5. Update the storage controller to use both _subsources_ and _read-only tables_
7. Update the storage controller to use both _subsources_ and _read-only tables_
as _source_exports_ for existing multi-output sources (postgres & mysql).

6. Update the source rendering operators to handle the new structures and allow
outputting the same upstream table to more than one souce export.
8. Update introspection tables to expose details of source-fed tables
(`CREATE TABLE .. FROM SOURCE` tables)

9. Add a new SQL command that returns all possible known upstream tables and columns for a source.

7. Migrate existing sources to the new source model (make all sources 'multi-output' sources)
and preserve the original names tied to each collection such that downstream object
references don't need to change.
10. Implement an opt-in migration using a feature-flag to convert subsources to tables
for existing multi-output sources (Postgres, MySQL, Load Generators)

8. Remove subsource purification logic from `purify_create_source`, subsource statement parsing,
and related planning code.
11. Restructure kafka source planning and rendering to use source_export structure and allow
multiple source-exports for a given kafka topic

12. Implement an opt-in migration for kafka sources to be converted to table structure

13. Remove subsource purification logic from `purify_create_source`, subsource statement parsing,
and related planning code.

### Core Implementation Details

Expand All @@ -182,38 +200,29 @@ CREATE TABLE <name> (<cols>) WITH (<options>)
```

We would update the `CREATE TABLE` statement to be able to optionally reference an upstream
source and reference using `FROM SOURCE <source> (REFERENCE <upstream reference>)`:
source and reference using `FROM SOURCE <source> (REFERENCE <upstream reference>)` and
any additional options necessary for ingesting the upstrema data:

```sql
CREATE TABLE <name> (<cols>) FROM SOURCE <source_name> (REFERENCE = <upstream name>)
CREATE TABLE <name> FROM SOURCE <source_name> (REFERENCE = <upstream name>) WITH (TEXT COLUMNS = (..), ..)
```

`<cols>` can be optionally specified by the user to request a subset of the upstream table's
columns, but will not be permitted to include user-specified column types, since these will
be determined by the upstream source details.

A new `CreateTableFromSource` statement will be introduced that includes the source reference
`T::ItemName` and the external reference `UnresolvedItemName`.

We would then introduce a new `TableDataSource` enum and add a field to the `Table` objects
used for the in-memory catalog and SQL planning, that optionally includes details
on a per-source basis in an `ExportDetails` field:
of an upstream data source in the `DataSource` variant, which will contain the existing
`DataSourceDesc` structs used in sources:

```rust
pub enum TableDataSource {
/// The table owns data created via INSERT/UPDATE/DELETE statements.
TableWrites,

/// The table receives its data from the identified ingestion, specifically
/// the upstream object identified by `external_reference`. The `details`
/// field contains any options necessary to map the upstream relation to this
/// relation.
/// The table receives its data from the identified data source.
/// This table type does not support INSERT/UPDATE/DELETE statements.
IngestionExport {
ingestion_id: GlobalId,
external_reference: UnresolvedItemName,
details: ExportDetails,
}
DataSource(DataSourceDesc)
}

pub struct Table {
Expand All @@ -222,9 +231,31 @@ pub struct Table {
...
pub data_source: TableDataSource
}
```

The `DataSourceDesc` struct's `IngestionExport` variant will be extended to
include a `details: SourceExportDetails` field on each export, such that all
the individual details necessary to render that export are stored here and
not on the top-level `Ingestion`:

```rust
pub enum DataSourceDesc {
/// Receives data from an external system.
Ingestion(Ingestion),
/// This source receives its data from the identified ingestion,
/// specifically the output identified by `external_reference`.
IngestionExport {
ingestion_id: GlobalId,
external_reference: UnresolvedItemName,
details: SourceExportDetails,
},

...
}


/// this is an example and these enum structs would likely be their own types
pub enum ExportDetails {
pub enum SourceExportDetails {
Kafka,
Postgres {
column_casts: ...,
Expand All @@ -242,11 +273,12 @@ pub enum ExportDetails {
```

The planning for `CREATE TABLE` will be adjusted to include a purification step
akin to the purification in `purify_alter_source`. This will also map any
specified columns into a projection of the upstream table's column order.
This will also verify the upstream permissions, schema, etc of the upstream table,
and generate a new `details` for the top-level source, that is merged with the
top-level source during sequencing (how alter source works now).
akin to the purification in `purify_alter_source`.
This will verify the upstream permissions, schema, etc of the upstream table, and
create the `details` necessary for this new export to be rendered as part of the
source. It will not need to be merged with the top-level source during sequencing
since its own details are entirely self-contained, unlike existing `CREATE SUBSOURCE`
statements.

#### Storage Collection Coordination

Expand Down Expand Up @@ -306,13 +338,11 @@ Then inside each source implementation, we currently assume that each
corresponds to the same index of the upstream table in the source's upstream `details`.

Since the top-level source `details` will no longer contain the `tables` field, the
output index will be determined by the ordering of the `IngestionDescription::source_exports` `BTreeMap`. Each `SourceExport` will output its own stream from the ingestion using
output index will be determined by the ordering of the `IngestionDescription::source_exports` `BTreeMap`.
Each `SourceExport` will output its own stream from the ingestion using
the `details` it contains. It will be up to each source implementation to map the
relevant upstream table to the correct `SourceExport`s using their `external_reference`.

The `build_ingestion_dataflow` method then demuxes the output collection of
each source by each `output_index` and pushes each data stream to the appropriate
`persist_sink`.
relevant upstream table to the correct `SourceExport`s using the `SourceExportDetails`
and output to the correct `output_index`.

#### Migration of source statements and collections

Expand Down

0 comments on commit 66a76ac

Please sign in to comment.