Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookup joins #821

Merged
merged 14 commits into from
Jan 13, 2025
Merged

Lookup joins #821

merged 14 commits into from
Jan 13, 2025

Conversation

mwylde
Copy link
Member

@mwylde mwylde commented Jan 13, 2025

This PR introduces a new type of table, connector, and join, to support use cases where we wish to enrich a stream by performing ad-hoc queries on another system (typically a DB or cache). We call these lookup joins, as we are joining a stream to another table via a query mechanism, as well as an initial Redis lookup connector.

It looks like this:

CREATE TABLE events (
    event_id TEXT,
    timestamp TIMESTAMP,
    customer_id TEXT,
    event_type TEXT
) WITH (
    connector = 'kafka',
    topic = 'events',
    type = 'source',
    format = 'json',
    bootstrap_servers = 'broker:9092'
);

CREATE TEMPORARY TABLE customers (
    customer_id TEXT GENERATED ALWAYS AS (metadata('key')) STORED, 
    name TEXT,
    plan TEXT
) with (
    connector = 'redis',
    format = 'raw_string',
    address = 'redis://localhost:6379',
    format = 'json',
    'lookup.cache.max_bytes' = '1000000'
    'lookup.cache.ttl' = '5 second'
);

SELECT  e.event_id,  e.timestamp,  e.customer_id,  e.event_type, c.customer_name, c.plan
FROM  events e
LEFT JOIN customers c
ON concat('customer.', e.customer_id) = c.customer_id
WHERE c.plan = 'Premium';

Here, we create a "temporary table"—implying that it's not fully materialized—backed by a Redis cache, with the Redis key specified via the metadata syntax introduced in 0.13. We then perform a join (in this case a left join, although inner is also supported) against it, causing a lookup into Redis for the specified key.

Lookup joins also optionally include a cache, which can be configured by the user with a TTL and/or a max size in bytes.

This PR also includes a refactor of the deserialization system to reduce the number of independent codepaths that handle metadata field deserialization and combines logic between JSON and non-JSON paths. This also fixed several corner cases where additional fields were not being injected properly in certain formats.

This addresses #820

@mwylde mwylde enabled auto-merge (squash) January 13, 2025 22:24
@mwylde mwylde merged commit 81bc429 into master Jan 13, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant