Skip to content

Commit

Permalink
Enforce primary keys for lookup joins
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jan 13, 2025
1 parent 0aaf1a7 commit 8e23923
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 44 deletions.
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub fn impulse_schema() -> ConnectionSchema {
],
definition: None,
inferred: None,
primary_keys: Default::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/nexmark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub fn nexmark_schema() -> ConnectionSchema {
.collect(),
definition: None,
inferred: None,
primary_keys: Default::default(),
}
}

Expand Down
20 changes: 17 additions & 3 deletions crates/arroyo-connectors/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,23 @@ impl Connector for RedisConnector {
}

let sink = match typ.as_str() {
"lookup" => TableType::Lookup {
lookup: Default::default(),
},
"lookup" => {
// for look-up tables, we require that there's a primary key metadata field
for f in &schema.fields {
if schema.primary_keys.contains(&f.field_name) {
if f.metadata_key.as_ref().map(|k| k != "key").unwrap_or(true) {
bail!(
"Redis lookup tables must have a PRIMARY KEY field defined as \
`field_name TEXT GENERATED ALWAYS AS (metadata('key')) STORED`"
);
}
}
}

TableType::Lookup {
lookup: Default::default(),
}
}
"sink" => {
let target = match pull_opt("target", options)?.as_str() {
"string" => Target::StringTable {
Expand Down
22 changes: 14 additions & 8 deletions crates/arroyo-planner/src/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,23 +262,29 @@ fn maybe_plan_lookup_join(join: &Join) -> Result<Option<LogicalPlan>> {
return plan_err!("filter join conditions are not supported for lookup joins; must have an equality condition");
}

let mut lookup = FindLookupExtension::default();
join.right.visit(&mut lookup)?;

let connector = lookup
.table
.expect("right side of join does not have lookup");

let on = join.on.iter().map(|(l, r)| {
match r {
Expr::Column(c) => Ok((l.clone(), c.clone())),
Expr::Column(c) => {
if !connector.primary_keys.contains(&c.name) {
plan_err!("the right-side of a look-up join condition must be a PRIMARY KEY column, but '{}' is not", c.name)
} else {
Ok((l.clone(), c.clone()))
}
},
e => {
plan_err!("invalid right-side condition for lookup join: `{}`; only column references are supported",
expr_to_sql(e).map(|e| e.to_string()).unwrap_or_else(|_| e.to_string()))
}
}
}).collect::<Result<_>>()?;

let mut lookup = FindLookupExtension::default();
join.right.visit(&mut lookup)?;

let connector = lookup
.table
.expect("right side of join does not have lookup");

let left_input = JoinRewriter::create_join_key_plan(
join.left.clone(),
join.on.iter().map(|(l, _)| l.clone()).collect(),
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-planner/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ impl ConnectorTable {
schema_fields,
None,
Some(fields.is_empty()),
primary_keys.iter().cloned().collect(),
)
.map_err(|e| DataFusionError::Plan(format!("could not create connection schema: {}", e)))?;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
--fail=the right-side of a look-up join condition must be a PRIMARY KEY column, but 'value' is not
create table impulse with (
connector = 'impulse',
event_rate = '2'
);

create temporary table lookup (
key TEXT PRIMARY KEY GENERATED ALWAYS AS (metadata('key')) STORED,
value TEXT,
len INT
) with (
connector = 'redis',
format = 'raw_string',
address = 'redis://localhost:6379',
format = 'json',
'lookup.cache.max_bytes' = '100000'
);

select A.counter, B.key, B.value, len
from impulse A inner join lookup B
on cast((A.counter % 10) as TEXT) = B.value;
19 changes: 19 additions & 0 deletions crates/arroyo-planner/src/test/queries/error_missing_redis_key.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
--fail=Redis lookup tables must have a PRIMARY KEY field defined as `field_name TEXT GENERATED ALWAYS AS (metadata('key')) STORED`
create table impulse with (
connector = 'impulse',
event_rate = '2'
);

create table lookup (
key TEXT PRIMARY KEY,
value TEXT
) with (
connector = 'redis',
format = 'json',
address = 'redis://localhost:6379',
type = 'lookup'
);

select A.counter, B.key, B.value
from impulse A left join lookup B
on cast((A.counter % 10) as TEXT) = B.key;
52 changes: 22 additions & 30 deletions crates/arroyo-planner/src/test/queries/lookup_join.sql
Original file line number Diff line number Diff line change
@@ -1,39 +1,31 @@
CREATE TABLE orders (
order_id INT,
user_id INT,
product_id INT,
quantity INT,
order_timestamp TIMESTAMP
) with (
CREATE TABLE events (
event_id TEXT,
timestamp TIMESTAMP,
customer_id TEXT,
event_type TEXT
) WITH (
connector = 'kafka',
bootstrap_servers = 'localhost:9092',
topic = 'events',
type = 'source',
topic = 'orders',
format = 'json'
format = 'json',
bootstrap_servers = 'broker:9092'
);

CREATE TEMPORARY TABLE products (
key TEXT PRIMARY KEY,
product_name TEXT,
unit_price FLOAT,
category TEXT,
last_updated TIMESTAMP
create temporary table customers (
customer_id TEXT PRIMARY KEY GENERATED ALWAYS AS (metadata('key')) STORED,
customer_name TEXT,
plan TEXT
) with (
connector = 'redis',
format = 'raw_string',
address = 'redis://localhost:6379',
format = 'json',
type = 'lookup',
address = 'redis://localhost:6379'
'lookup.cache.max_bytes' = '1000000',
'lookup.cache.ttl' = '5 second'
);

SELECT
o.order_id,
o.user_id,
o.quantity,
o.order_timestamp,
p.product_name,
p.unit_price,
p.category,
(o.quantity * p.unit_price) as total_amount
FROM orders o
JOIN products p
ON concat('blah', o.product_id) = p.key;
SELECT e.event_id, e.timestamp, e.customer_id, e.event_type, c.customer_name, c.plan
FROM events e
LEFT JOIN customers c
ON e.customer_id = c.customer_id
WHERE c.plan = 'Premium';
10 changes: 7 additions & 3 deletions crates/arroyo-rpc/src/api_types/connections.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::df::{ArroyoSchema, ArroyoSchemaRef};
use crate::formats::{BadData, Format, Framing};
use crate::{primitive_to_sql, MetadataField};
use ahash::HashSet;
use anyhow::bail;
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use arroyo_types::ArroyoExtensionType;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Display, Formatter};
use std::sync::Arc;

use crate::df::{ArroyoSchema, ArroyoSchemaRef};
use arroyo_types::ArroyoExtensionType;
use utoipa::{IntoParams, ToSchema};

#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)]
Expand Down Expand Up @@ -252,6 +252,8 @@ pub struct ConnectionSchema {
pub fields: Vec<SourceField>,
pub definition: Option<SchemaDefinition>,
pub inferred: Option<bool>,
#[serde(default)]
pub primary_keys: HashSet<String>,
}

impl ConnectionSchema {
Expand All @@ -263,6 +265,7 @@ impl ConnectionSchema {
fields: Vec<SourceField>,
definition: Option<SchemaDefinition>,
inferred: Option<bool>,
primary_keys: HashSet<String>,
) -> anyhow::Result<Self> {
let s = ConnectionSchema {
format,
Expand All @@ -272,6 +275,7 @@ impl ConnectionSchema {
fields,
definition,
inferred,
primary_keys,
};

s.validate()
Expand Down

0 comments on commit 8e23923

Please sign in to comment.