Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge debezium outputs #762

Merged
merged 13 commits into from
Oct 22, 2024
Merged

Merge debezium outputs #762

merged 13 commits into from
Oct 22, 2024

Conversation

mwylde
Copy link
Member

@mwylde mwylde commented Oct 21, 2024

This PR reworks several details of Arroyo's updating SQL support to improve the UX and set up for supporting more complex updating queries.

Internally, Arroyo represents updates as a flat event with a "retract" field that may be set to true or false. However, Debezium formatted data (whether being consumed or produced) has a nested structure with a "before", "after", and "op" field. When we consume debezium, we need to first "unroll" it into our flattened format, so each update becomes a sequence of deletes and creates.

Similarly, when an updating operator (like an updating aggregate) produce an update for a row, they cannot produce an update (because we don't have a native way to represent that) but only a sequence of retract and create rows.

Currently, a query like

SELECT count(*) from table;

will produce a Debezium result stream like this

image

reflecting the underlying representation.

However, this is undeseriable—we've turned atomic updates into non-atomic delete/create pairs, leading to the potential for data loss in the consuming system if a delete gets consumed but not the corresponding create. It's also double the events and extra cognitive load for what is truly an update.

With this PR, we now merge the deletes and creates into updates, so we get this:

image

There is also a small breaking change; in order to support more efficient updating operations off of Debezium sources, we now require that sources be annotated with at least one PRIMARY KEY field:

CREATE TABLE debezium_source (
    id INT PRIMARY KEY,
    customer_id INT,
    price FLOAT,
    order_date TIMESTAMP,
    status TEXT
) WITH (
    connector = 'kafka',
    format = 'debezium_json',
    type = 'source',
    ...
);

This must reflect the primary keys in the underlying tables.

@mwylde mwylde force-pushed the merge_updates branch 2 times, most recently from f07c33b to 399fbf6 Compare October 22, 2024 01:06
@mwylde mwylde enabled auto-merge (squash) October 22, 2024 01:07
@mwylde mwylde merged commit 843e327 into master Oct 22, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant