Skip to content

Commit

Permalink
Address some design comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Jul 16, 2024
1 parent b4dba2d commit a820cec
Showing 1 changed file with 138 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -1,58 +1,71 @@
# Subsource deprecation & source blue-green schema changes
# Source Versioning / Tables from Sources

This design encompases 2 existing proposals also known as
'Subsource Demuxing' and 'Subsources as Tables'.

Associated:
- https://github.com/MaterializeInc/materialize/issues/20208
- https://github.com/MaterializeInc/materialize/issues/24843
- https://github.com/MaterializeInc/materialize/issues/15897
- https://github.com/MaterializeInc/materialize/issues/17021

- https://github.com/MaterializeInc/materialize/issues/20208
- https://github.com/MaterializeInc/materialize/issues/24843
- https://github.com/MaterializeInc/materialize/issues/15897
- https://github.com/MaterializeInc/materialize/issues/17021

## The Problems

TLDR:
- Subsources are confusing and not implemented uniformly across source types
- It's impossible to do a blue-green workflow in sources for handling
schema-changes in upstream systems

Users currently have to learn about *sources*, *subsources*, *tables*,
*materialized views*, *indexes*, and *sinks* when onboarding to Materialize.
- Subsources are confusing and not implemented uniformly across source types
- It's impossible to do a blue-green workflow in sources for handling
schema-changes in upstream systems, without creating another top-level source
and reingesting all your data / losing your reclocking information
- Since subsources aren't created using explicit statements its difficult to
manage them using dbt

Users currently have to learn about _sources_, _subsources_, _tables_,
_materialized views_, _indexes_, and _sinks_ when onboarding to Materialize.

If they'd like to ingest data from both an upstream Postgres/MySQL database
and a Kafka topic, they also need to learn the fragmented user interface
for dealing with data ingested from upstream data sources. Our existing
model of a *source* does not provide a consistent interface among source types.
model of a _source_ does not provide a consistent interface among source types.

Single-output Kafka sources put their data under the source's primary relation,
whereas multi-output Postgres/MySQL sources do not allow querying the source's primary
relation and instead contain *subsources*, an entirely separate concept.
Single-output sources like Kafka and webhook sources put their data under the
source's primary relation, whereas multi-output sources like PostgreSQL and
MySQL sources do not allow querying the source's primary relation and instead
multiplex into _subsources_, an entirely separate concept. In addition to these
core source types, we also support the concept of _push sources_ (e.g. the
Materialize Fivetran connector), which write data to tables.

Our existing *source* models also make it difficult to deal with upstream schema changes,
such that users need to drop and recreate a *source* or *subsource* and any dependent objects
Our existing _source_ models also make it difficult to deal with upstream schema changes,
such that users need to drop and recreate a _source_ or _subsource_ and any dependent objects
to begin ingesting data with a new schema. This can cause long-periods of downtime if the
new source needs to reingest the entire upstream history. This can also cause them to
lose their _reclocking_ information which makes it impossible to implement a correct
end-to-end *source* -> *sink* pipeline.
end-to-end _source_ -> _sink_ pipeline.

This change aims to simplify the user experience and create a more unified
model for managing data ingested from an upstream source, while providing
more flexibility in handling upstream schema changes.
Since subsources are created automagically as the result of a `CREATE SOURCE ..` statement
it is difficult to manage them with tools like dbt, which require all objects to be
created and dropped using explicit statements per object.

This change aims to simplify the user experience and create a more unified
model for managing data ingested from an upstream external system, while providing
more flexibility in handling upstream schema changes and managing sources in tools like dbt.

## Success Criteria

- The mental overhead & learning time for users to ingest data from an upstream system
is decreased.
- All source types use a consistent interface to retrieve progress information and
ingested data. The primary *source* object represents the same concept across
all source types.
- The same upstream table/topic can be ingested into multiple collections in Materialize,
each with a potentially different projection of the upstream schema. This enables
a user to do blue-green schema-change workflows without unavailability.
- *sources* can maintain their progress collection while the set of collections being
ingested are modified, allowing an end-to-end *source* -> *sink* schema-change workflow
that does not emit redundant data (reusable reclocking).
- The mental overhead & learning time for users to ingest data from an upstream system
is decreased.
- All source types use a consistent interface to retrieve progress information and
ingested data. The primary _source_ object represents the same concept across
all source types.
- The same upstream table/topic can be ingested into multiple collections in Materialize,
each with a potentially different projection of the upstream schema. This enables
a user to do blue-green schema-change workflows without unavailability.
- _sources_ can maintain their progress collection while the set of collections being
ingested are modified, allowing an end-to-end _source_ -> _sink_ schema-change workflow
that does not emit redundant data (reusable reclocking).
- The object representing each upstream table/topic in materialize is created and dropped
explicitly, such that they can be managed as models directly in dbt.

## Out of Scope

Expand All @@ -64,40 +77,39 @@ It's important to be clear about what parts of a problem we won't be solving
and why. This leads to crisper designs, and it aids in focusing the reviewer.
-->

- Accomplishing 'subsource demuxing' using the concept of *subsources* in-use today.
The goals of https://github.com/MaterializeInc/materialize/issues/24843 will be
accomplished during the process of replacing *subsources* with *tables* (see
solution proposal below).

- Dealing with Webhook sources. These are fundamentally different than the other
storage-owned source types whose operators implement the `SourceRender` trait,
and they do not contain a 'progress' collection. While they may also be
converted to use 'tables' in the future, that work is not in-scope for this proposal.
- Accomplishing 'subsource demuxing' using the concept of _subsources_ in-use today.
The goals of https://github.com/MaterializeInc/materialize/issues/24843 will be
accomplished during the process of replacing _subsources_ with _tables_ (see
solution proposal below).

- Dealing with Webhook sources. These are fundamentally different than the other
storage-owned source types whose operators implement the `SourceRender` trait,
and they do not contain a 'progress' collection. While they may also be
converted to use 'tables' in the future, that work is not in-scope for this proposal.

## Solution Proposal

The concept of a *source* will be unified around a single relation representing
the *progress* of that source's ingestion pipeline. A query of any *source*
The concept of a _source_ will be unified around a single relation representing
the _progress_ of that source's ingestion pipeline. A query of any _source_
object (of any type) will return the data that is currently in the
`<source>_progress` collection.

We have been investing in the user experience around *tables*, such that
replacing *subsources* with *tables* would reduce the overall
We have been investing in the user experience around _tables_, such that
replacing _subsources_ with _tables_ would reduce the overall
mental overhead for new users.

By allowing tables to be marked as *read-only*, we can easily migrate
By allowing tables to be marked as _read-only_, we can easily migrate
existing subsources to be tables without needing to figure out how to merge
`INSERT/UPDATE/DELETE` statements with the data written by their corresponding source.

The default `CREATE SOURCE ..` statement will just create a top-level source object
that represents a single ingestion of data from an upstream system. To actually
ingest data from that upstream system into a persist collection, the user will
use something akin to
`CREATE TABLE <table> (<cols>) OF SOURCE <source> WITH (<upstream_ref>)`.
use a `CREATE TABLE .. ` statement with an option that references the source
and the external reference.

We will allow more than one table to reference the same `<upstream_ref>`, using a
potentially different set of columns. This allows a user to handle an upstream
We will allow more than one table to reference the same external upstream reference,
using a potentially different set of columns. This allows a user to handle an upstream
schema change by creating a new table for the same upstream table and then performing
a blue-green swap operation to switch their downstream dependencies to the new table,
and then drop the old one.
Expand All @@ -109,19 +121,19 @@ explicitly referenced in a `CREATE TABLE` statement. While this may seem controv
more often than not these options cause upstream data that does not need to be
brought into Materialize to be ingested, and by using explicit statements for each
table to be ingested it makes Materialize configuration much more amenable to
object<->state mappings in tools like DBT and Terraform.
object<->state mappings in tools like dbt and Terraform.

We can eventually reintroduce syntactic sugar to perform a similar function to
`FOR ALL TABLES` and `FOR SCHEMAS` if necessary.

`FOR ALL TABLES` and `FOR SCHEMAS` if necessary, or introduce a new SQL command
that returns all known upstream tables and columns for a source.

### Implementation Plan

1. Separate the planning of *sources* and *subsources* such that sources are fully
1. Separate the planning of _sources_ and _subsources_ such that sources are fully
planned before any linked subsources. Part of this work has been prototyped in:
https://github.com/MaterializeInc/materialize/pull/27320

2. Introduce the concept of *read-only tables* that link to a source ingestion
2. Introduce the concept of _read-only tables_ that link to a source ingestion
and an upstream 'reference' specifying which upstream table to ingest into this table.

3. Update the CREATE TABLE statement to allow creation of a read-only table
Expand All @@ -132,59 +144,58 @@ We can eventually reintroduce syntactic sugar to perform a similar function to
purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify the upstream
permissions, schema, etc for the newly added source-fed table.

5. Update the underlying *source* (ingestion) and source_export/ingestion_export
structures to allow more than one *source_export* to refer to an upstream
reference, and each *source_export* to have its own projection of the upstream data
5. Update the underlying _source_ (ingestion) and source*export/ingestion_export
structures to allow more than one \_source_export* to refer to an upstream
reference, and each _source_export_ to have its own projection of the upstream data

5. Update the storage controller to use both *subsources* and *read-only tables*
as *source_exports* for existing multi-output sources (postgres & mysql)
6. Update the storage controller to use both _subsources_ and _read-only tables_
as _source_exports_ for existing multi-output sources (postgres & mysql).

6. Update the source rendering operators to handle the new structures and allow
7. Update the source rendering operators to handle the new structures and allow
outputting the same upstream table to more than one souce export.

7. Migrate existing sources to the new source model (make all sources 'multi-output' sources):
- For existing sources where the primary relation has the data (e.g. Kafka sources):
- Create a read-only table for the current primary relation called `<source>`
- Rename the source itself `<source>_progress` and drop the existing
`<source>_progress` object
- For existing multi-output sources (e.g. Postgres & MySQL sources):
- Convert subsources to read-only tables with the same name
- Rename the source itself `<source>_progress` and drop the existing
`<source>_progress` object

8. Remove subsource purification logic from `purify_create_source`, subsource statement parsing,
and related planning code
8. Migrate existing sources to the new source model (make all sources 'multi-output' sources)
and preserve the original names tied to each collection such that downstream object
references don't need to change.

9. Remove subsource purification logic from `purify_create_source`, subsource statement parsing,
and related planning code.

### Core Implementation Details

#### SQL Parsing & Planning

Our existing `CREATE SUBSOURCE` statements look like this:

```sql
CREATE SUBSOURCE <name> (<cols>) OF SOURCE <source_name> WITH (EXTERNAL REFERENCE = <upstream table name>)
```

though these are mostly hidden from users, they are available if a user runs `SHOW CREATE SOURCE <existing_subsource>`

Our existing `CREATE TABLE` statement looks like:

```sql
CREATE TABLE <name> (<cols>) WITH (<options>)
```

We would update the `CREATE TABLE` statement to look similar to `CREATE SUBSOURCE` and
be able to optionally reference an upstream source and reference using `OF SOURCE` and `EXTERNAL REFERENCE`:
We would update the `CREATE TABLE` statement to be able to optionally reference an upstream
source and reference using `FROM SOURCE <source> (REFERENCE <upstream reference>)`:

```sql
CREATE TABLE <name> (<cols>) OF SOURCE <source_name> WITH (EXTERNAL REFERENCE = <upstream table name>)
CREATE TABLE <name> (<cols>) FROM SOURCE <source_name> (REFERENCE = <upstream name>)
```

`<cols>` can be optionally specified by the user to request a subset of the upstream table's
columns, but will not be permitted to include user-specified column types, since these will
be determined by the upstream source details.

`CreateTableStatement` would be updated to include the new option type `CreateTableOptionName::ExternalReference` and a `pub of_source: Option<T::ItemName>`
field to reference the optional upstream source.
A new `CreateTableFromSource` statement will be introduced that includes the source reference
`T::ItemName` and the external reference `UnresolvedItemName`.

We would then introduce a new `TableDataSource` enum and add a field to the `Table` objects
used for the in-memory catalog and SQL planning:

```rust
pub enum TableDataSource {
/// The table owns data created via INSERT/UPDATE/DELETE statements.
Expand Down Expand Up @@ -216,7 +227,6 @@ This will also verify the upstream permissions, schema, etc of the upstream tabl
and generate a new `details` for the top-level source, that is merged with the
top-level source during sequencing (how alter source works now).


#### Storage Collection Coordination

The coordinator's `bootstrap_storage_collections` will be updated to handle
Expand All @@ -226,6 +236,7 @@ rather than the existing `DataSourceOther::TableWrites` value used for all table

The storage controller's `DataSource::IngestionExport` struct will be updated to
include an optional projection, populated based on the table projection mentioned above:

```rust
IngestionExport {
ingestion_id: GlobalId,
Expand All @@ -246,6 +257,7 @@ reference should be mapped to a specific output collection.

The storage `SourceExport` struct will be updated to include an optional
projection field, populated from the `IngestionExport`'s `external_projection`:

```rust
pub struct SourceExport<O: proptest::prelude::Arbitrary, S = ()> {
/// Which output from the ingestion this source refers to.
Expand Down Expand Up @@ -284,24 +296,56 @@ each source by each `output_index` and pushes each data stream to the appropriat
`persist_sink`. We would update this method so that it applies the `output_projection`
for each `SourceExport` on each data stream before being pushed to its `persist_sink`.

#### Migration of source names (progress collections) & kafka source -> table migration

TODO
#### Migration of source statements and collections

We would generate a catalog migration to generate new `CREATE TABLE` statements where
necessary and shuffle the collections being pointed at by each statement. The intent
is to preserve the names associated with each collection even if the statement that
identifies each collection is updated. The migration would do the following:

- For existing sources where the primary relation has the data (e.g. Kafka sources):
- Generate a new `CREATE TABLE` statement that is tied to the current primary collection
and name it `<source>`
- Change the `CREATE SOURCE` statement to point to the current progress collection and
change the source name to `<source>_progress`
- Drop the existing `CREATE SUBSOURCE <source>_progress` statement since this will no
longer have a collection to own
- For existing multi-output sources (e.g. Postgres & MySQL sources):
- Convert subsources to `CREATE TABLE` statements with the same name
- Change the `CREATE SOURCE` statement to point to the current progress collection and
change the source name to `<source>_progress`
- Drop the existing `CREATE SUBSOURCE <source>_progress` statement since this will no
longer have a collection to own
- Drop the existing top-level collection tied to the source, since this is already
unused and will no longer be owned by any statement

## Minimal Viable Prototype


## Alternatives

<!--
What other solutions were considered, and why weren't they chosen?
This is your chance to demonstrate that you've fully discovered the problem.
Alternative solutions can come from many places, like: you or your Materialize
team members, our customers, our prospects, academic research, prior art, or
competitive research. One of our company values is to "do the reading" and
to "write things down." This is your opportunity to demonstrate both!
-->
These are not 'full' alternatives but are each solutions to one or more of the
problems outlined above:

1. Introduce a new `FOR TABLES ()` on existing multi-output source types to enable
workflows with dbt. We could allow creating a postgres or mysql source with
zero tables, and then have dbt use the existing `ALTER SOURCE .. ADD SUBSOURCE`
and `DROP SOURCE` statements to manage creating/removing subsources. While
this would be a simpler implementation, it doesn't work for single-output
sources (e.g. Kafka).

2. Implement in-place schema changes on subsources. Support for in-place schema
changes is predicated on evolving a collection's underlying persist
schema, which isn't currently possible but is being worked on in the effort
to use parquet column-level encoding for persist. Additionally, since subsources
are not used for single-output sources, this would add to the inconsistent
interface among source types (in-place schema modification statements would
have to be implemented for both top-level sources and subsources).

3. Implement demuxing of upstream tables using existing subsource model
(https://github.com/MaterializeInc/materialize/issues/24843). This would be an
easier effort to implement, but rather than doing this work in the short term
for it to be later ripped out, it is preferable to implement the interface
that solves both this need and the other problems outlined above.

## Open questions

Expand All @@ -315,3 +359,6 @@ process, you are responsible for getting answers to these open
questions. All open questions should be answered by the time a design
document is merged.
-->

1. How does this affect the system catalog, and source-specific introspection?
(e.g. `mz_source_statistics`)

0 comments on commit a820cec

Please sign in to comment.