Skip to content

Commit

Permalink
Lookup joins (#821)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Jan 13, 2025
1 parent 4014db4 commit 81bc429
Show file tree
Hide file tree
Showing 56 changed files with 1,854 additions and 655 deletions.
96 changes: 89 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ async fn expand_avro_schema(
ConnectionType::Sink => {
// don't fetch schemas for sinks for now
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
}
}

Expand All @@ -521,6 +524,9 @@ async fn expand_avro_schema(
schema.inferred = Some(true);
Ok(schema)
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
};
};

Expand Down Expand Up @@ -596,6 +602,9 @@ async fn expand_proto_schema(
ConnectionType::Sink => {
// don't fetch schemas for sinks for now
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
}
}

Expand Down Expand Up @@ -697,6 +706,9 @@ async fn expand_json_schema(
// don't fetch schemas for sinks for now until we're better able to conform our output to the schema
schema.inferred = Some(true);
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/sink/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<V: LocalWriter + Send + 'static> TwoPhaseCommitter for LocalFileSystemWrite

let storage_provider = StorageProvider::for_url(&self.final_dir).await?;

let schema = Arc::new(ctx.in_schemas[0].clone());
let schema = ctx.in_schemas[0].clone();

self.commit_state = Some(match self.file_settings.commit_style.unwrap() {
CommitStyle::DeltaLake => CommitState::DeltaLake {
Expand Down
3 changes: 1 addition & 2 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1564,8 +1564,7 @@ impl<R: MultiPartWriter + Send + 'static> TwoPhaseCommitter for FileSystemSink<R
ctx: &mut OperatorContext,
data_recovery: Vec<Self::DataRecovery>,
) -> Result<()> {
self.start(Arc::new(ctx.in_schemas.first().unwrap().clone()))
.await?;
self.start(ctx.in_schemas.first().unwrap().clone()).await?;

let mut max_file_index = 0;
let mut recovered_files = Vec::new();
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/filesystem/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl FileSystemSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);
let parallelism = ctx.task_info.parallelism;
let task_index = ctx.task_info.task_index;
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/fluvio/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl SourceOperator for FluvioSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

match self.run_int(ctx, collector).await {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub fn impulse_schema() -> ConnectionSchema {
],
definition: None,
inferred: None,
primary_keys: Default::default(),
}
}

Expand Down
24 changes: 14 additions & 10 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,14 @@ impl KafkaTester {
let mut deserializer = ArrowDeserializer::with_schema_resolver(
format.clone(),
None,
aschema.clone(),
Arc::new(aschema),
&schema.metadata_fields(),
BadData::Fail {},
Arc::new(schema_resolver),
);
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.deserialize_slice(&msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand All @@ -663,14 +663,14 @@ impl KafkaTester {
let aschema: ArroyoSchema = schema.clone().into();
let mut deserializer = ArrowDeserializer::new(
format.clone(),
aschema.clone(),
Arc::new(aschema),
&schema.metadata_fields(),
None,
BadData::Fail {},
);
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.deserialize_slice(&msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down Expand Up @@ -699,12 +699,16 @@ impl KafkaTester {
}
Format::Protobuf(_) => {
let aschema: ArroyoSchema = schema.clone().into();
let mut deserializer =
ArrowDeserializer::new(format.clone(), aschema.clone(), None, BadData::Fail {});
let mut builders = aschema.builders();
let mut deserializer = ArrowDeserializer::new(
format.clone(),
Arc::new(aschema),
&schema.metadata_fields(),
None,
BadData::Fail {},
);

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.deserialize_slice(&msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/kafka/sink/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl KafkaTopicTester {
None,
command_tx,
1,
vec![ArroyoSchema::new_unkeyed(schema(), 0)],
vec![Arc::new(ArroyoSchema::new_unkeyed(schema(), 0))],
None,
HashMap::new(),
)
Expand Down
4 changes: 3 additions & 1 deletion crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ impl KafkaSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&self.metadata_fields,
schema_resolver.clone(),
);
} else {
collector.initialize_deserializer(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&self.metadata_fields,
);
}

Expand All @@ -201,7 +203,7 @@ impl KafkaSourceFunc {
let connector_metadata = if !self.metadata_fields.is_empty() {
let mut connector_metadata = HashMap::new();
for f in &self.metadata_fields {
connector_metadata.insert(&f.field_name, match f.key.as_str() {
connector_metadata.insert(f.field_name.as_str(), match f.key.as_str() {
"offset_id" => FieldValueType::Int64(msg.offset()),
"partition" => FieldValueType::Int32(msg.partition()),
"topic" => FieldValueType::String(topic),
Expand Down
Loading

0 comments on commit 81bc429

Please sign in to comment.