Skip to content

Commit

Permalink
Pose some potential complications regarding kafka sources
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Aug 21, 2024
1 parent 3362661 commit b268029
Showing 1 changed file with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ the web console to expose a guided 'create source' workflow for users where
they can select from a list of upstream tables and the console can generate
the appropriate `CREATE TABLE .. FROM SOURCE` statements for those selected.

Each `CREATE TABLE .. FROM SOURCE` statement will refer to a `SourceExport` for
a given source, and each newly-added `SourceExport` for a Source will attempt to ingest
a full snapshot of its relevant upstream data upon creation. This allows the
new `SourceExport` to correctly backfill any newly added columns or defaults set
in the upstream system, rather than returning Null values for any 'old' rows.
The aim is to maintain correctness and consistency with the upstream when a
query selects from new columns on rows that have not been updated since the
new `SourceExport` was added.

### Implementation Plan

1. Separate the planning of _sources_ and _subsources_ such that sources are fully
Expand Down Expand Up @@ -344,6 +353,37 @@ the `details` it contains. It will be up to each source implementation to map th
relevant upstream table to the correct `SourceExport`s using the `SourceExportDetails`
and output to the correct `output_index`.

#### Kafka Sources

Kafka Sources will require more refactoring to move to a model where a single Kafka Source
can output to more than one `SourceExport`.

Since Kafka Sources only ever refer to a single Kafka Topic, a user will likely only ever
add a new 'Table' to a Kafka Source to handle a schema change in the messages published
to that topic.

When a new `SourceExport` is added to a Kafka Source, the new export needs to be hydrated
from the upstream topic to ensure it has a complete view of the topic contents.

This creates several potential issues regarding the semantics of Kafka Sources:

- If a new `SourceExport` is added, its resume upper will be the minimum value. If this
resume upper is merged with the existing resume upper for the source (which is what
happens in Postgres and MySQL sources), it will cause the consumer to begin reading
from each partition at the lowest offset available (or the 'start offset' defined
on the Kafka Source). This means that we will potentially do a long rehydration
to catch up to the previous resume-upper, such that existing `SourceExports` will not
see any new data until the rehydration completes.

This comment has been minimized.

Copy link
@benesch

benesch Aug 22, 2024

Member

I don't quite follow the hazard here. Sorry—it's my unfamiliarity with the current form of the source ingestion pipeline! Exactly which part of the pipeline stalls while waiting for the new subsource to rehydrate?

If I'm not mistaken, I think I've noticed this is problem with ALTER SOURCE ... ADD TABLE on PostgreSQL/MySQL sources too. Adding a new table means that replication on all existing tables stalls until the new table is snapshotted, which is a pretty bad hit to availability.

So I guess it depends on whether the problem is worse with Kafka than it is for PostgreSQL/MySQL, and I don't have quite enough context at the moment to assess that. But assuming Kafka is no worse than PostgreSQL/MySQL here, I think there's no need to do anything special. We can bundle that problem up as something to solve for all sources later.

- There is one existing Consumer Group ID used by the Kafka Source, and the source
commits the consumer offset to the kafka broker each time the progress collection
advances through reclocking. Since the Consumer may need to read from earlier
offsets to complete the snapshot of the new `SourceExport`, it's unclear how this
may affect external progress tracking and broker behavior.

This comment has been minimized.

Copy link
@benesch

benesch Aug 22, 2024

Member

I think this one's easy—let's just somehow include the subsource's table's ID in the consumer group ID? Then each subsource will have its progress tracked separately, which seems exactly right.


Open Question:

- Do we need to do anything special to handle any of the above?

#### Migration of source statements and collections

We would generate a catalog migration to generate new `CREATE TABLE` statements where
Expand Down

0 comments on commit b268029

Please sign in to comment.