Skip to content

Commit

Permalink
Update implementation details based on recent work and learnings
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Jul 22, 2024
1 parent 6095f67 commit 46fd1e1
Showing 1 changed file with 56 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,33 +132,35 @@ that returns all known upstream tables and columns for a source.
1. Separate the planning of _sources_ and _subsources_ such that sources are fully
planned before any linked subsources. Part of this work has been prototyped in:
https://github.com/MaterializeInc/materialize/pull/27320
Update: completed in https://github.com/MaterializeInc/materialize/pull/28310

2. Introduce the concept of _read-only tables_ that link to a source ingestion
and an upstream 'reference' specifying which upstream table to ingest into this table.

3. Update the CREATE TABLE statement to allow creation of a read-only table
2. Update the CREATE TABLE statement to allow creation of a read-only table
with a reference to a source and an upstream-reference:
(`CREATE TABLE <new_table> FROM SOURCE <source> (upstream_reference)`)
Update: statement introduced in https://github.com/MaterializeInc/materialize/pull/28125

4. Update planning for `CREATE TABLE` to include a purification step akin to the
purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify the upstream
permissions, schema, etc for the newly added source-fed table.
3. Update the underlying `SourceDesc`/`IngestionDesc` and
`SourceExport`/`IngestionExport` structs to include each export's specific
details and options on its own struct, rather than using an implicit mapping into
the top-level source's options.
This may involve moving options such as `TEXT COLUMNS` to be stored on subsource
statements rather than top-level source statements instead.

5. Update the underlying _source_ (ingestion) and source*export/ingestion_export
structures to allow more than one \_source_export* to refer to an upstream
reference, and each _source_export_ to have its own projection of the upstream data
4. Implement planning for `CREATE TABLE .. FROM SOURCE` to include a purification
step akin to the purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify
the upstream permissions, schema, etc for the newly added source-fed table.

6. Update the storage controller to use both _subsources_ and _read-only tables_
5. Update the storage controller to use both _subsources_ and _read-only tables_
as _source_exports_ for existing multi-output sources (postgres & mysql).

7. Update the source rendering operators to handle the new structures and allow
6. Update the source rendering operators to handle the new structures and allow
outputting the same upstream table to more than one souce export.

8. Migrate existing sources to the new source model (make all sources 'multi-output' sources)
7. Migrate existing sources to the new source model (make all sources 'multi-output' sources)
and preserve the original names tied to each collection such that downstream object
references don't need to change.

9. Remove subsource purification logic from `purify_create_source`, subsource statement parsing,
8. Remove subsource purification logic from `purify_create_source`, subsource statement parsing,
and related planning code.

### Core Implementation Details
Expand Down Expand Up @@ -194,21 +196,23 @@ A new `CreateTableFromSource` statement will be introduced that includes the sou
`T::ItemName` and the external reference `UnresolvedItemName`.

We would then introduce a new `TableDataSource` enum and add a field to the `Table` objects
used for the in-memory catalog and SQL planning:
used for the in-memory catalog and SQL planning, that optionally includes details
on a per-source basis in an `ExportDetails` field:

```rust
pub enum TableDataSource {
/// The table owns data created via INSERT/UPDATE/DELETE statements.
TableWrites,

/// The table receives its data from the identified ingestion, specifically
/// the output identified by `external_reference`, using the projection
/// `external_projection` to map the upstream relation to this relation.
/// the upstream object identified by `external_reference`. The `details`
/// field contains any options necessary to map the upstream relation to this
/// relation.
/// This table type does not support INSERT/UPDATE/DELETE statements.
IngestionExport {
ingestion_id: GlobalId,
external_reference: UnresolvedItemName,
external_projection: Vec<u8>,
details: ExportDetails,
}
}

Expand All @@ -218,6 +222,23 @@ pub struct Table {
...
pub data_source: TableDataSource
}

/// this is an example and these enum structs would likely be their own types
pub enum ExportDetails {
Kafka,
Postgres {
column_casts: ...,
table: PostgresTableDesc,
},
MySql {
table: MySqlTableDesc,
text_columns: Vec<String>,
ignore_columns: Vec<String>,
},
LoadGenerator {
...
}
}
```

The planning for `CREATE TABLE` will be adjusted to include a purification step
Expand All @@ -235,17 +256,13 @@ a storage controller `CollectionDescription` with a `DataSource::IngestionExport
rather than the existing `DataSourceOther::TableWrites` value used for all tables.

The storage controller's `DataSource::IngestionExport` struct will be updated to
include an optional projection, populated based on the table projection mentioned above:
include the export details:

```rust
IngestionExport {
ingestion_id: GlobalId,
// This is an `UnresolvedItemName` because the structure works for PG,
// MySQL, and load generator sources. However, in the future, it should
// be sufficiently genericized to support all multi-output sources we
// support.
external_reference: UnresolvedItemName,
external_projection: Option<Vec<u8>>,
details: ExportDetails,
},
```

Expand All @@ -255,8 +272,8 @@ inserts this into the appropriate `IngestionDescription` of the top-level
source. This is used in source rendering to figure out what upstream
reference should be mapped to a specific output collection.

The storage `SourceExport` struct will be updated to include an optional
projection field, populated from the `IngestionExport`'s `external_projection`:
The storage `SourceExport` struct will be updated to include the `details`
field, populated from the `IngestionExport`'s `details`:

```rust
pub struct SourceExport<O: proptest::prelude::Arbitrary, S = ()> {
Expand All @@ -265,10 +282,14 @@ pub struct SourceExport<O: proptest::prelude::Arbitrary, S = ()> {
/// The collection metadata needed to write the exported data
pub storage_metadata: S,
/// How to project the ingestion output to this export's collection.
pub output_projection: Option<Vec<u8>>,
pub details: ExportDetails,
}
```

Then the subsource-specific values currently stored in `GenericSourceConnection`
such as the Postgres and MySQL `tables` vecs will be removed, since each
'source export' would contain its own information.

#### Source Rendering

Currently, when an `IngestionDescription` is used inside `build_ingestion_dataflow`
Expand All @@ -277,24 +298,21 @@ it determines the `output_index` of each collection to be output by the ingestio
by calling the `IngestionDescription::source_exports_with_output_indices` method.

This method maps the `external_reference` (the upstream table name) of each `SourceExport`
to the index of that upstream table in the source's `details` struct (which is created
during purification -- its a vector of upstream table descriptions).
to the index of that upstream table in the source's `details` struct (which is currently
created during purification -- a vector of upstream table descriptions).

Then inside each source implementation, we currently assume that each
`SourceExport` uniqely corresponds to a single `output_index`, and that each `output_index`
corresponds to the same index of the upstream table in the source's upstream `details`.

This assumption is part of what prevents an upstream table from being ingested into
more than one collection.

We would update each source implementation to assume that more than one `SourceExport`
can share an `output_index` and we would update the `source_exports` map created
by `source_exports_with_output_indices` to include the `details` index explicitly.
Since the top-level source `details` will no longer contain the `tables` field, the
output index will be determined by the ordering of the `IngestionDescription::source_exports` `BTreeMap`. Each `SourceExport` will output its own stream from the ingestion using
the `details` it contains. It will be up to each source implementation to map the
relevant upstream table to the correct `SourceExport`s using their `external_reference`.

The `build_ingestion_dataflow` method currently demuxes the output collection of
The `build_ingestion_dataflow` method then demuxes the output collection of
each source by each `output_index` and pushes each data stream to the appropriate
`persist_sink`. We would update this method so that it applies the `output_projection`
for each `SourceExport` on each data stream before being pushed to its `persist_sink`.
`persist_sink`.

#### Migration of source statements and collections

Expand Down

0 comments on commit 46fd1e1

Please sign in to comment.