Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-data fields - Metadata fields to Kafka Connector #765

Merged
merged 42 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6658bb9
adding metadata fn
vaibhawvipul Oct 19, 2024
3bc6929
Adding check if metadata column fn
vaibhawvipul Oct 21, 2024
abff63a
remove log for debugs
vaibhawvipul Oct 21, 2024
440f700
updating schema to columns
vaibhawvipul Oct 23, 2024
3abecab
Adding validations and data to kafka messages
vaibhawvipul Oct 24, 2024
b5ac3d4
Merge branch 'master' into kafka-metadata
vaibhawvipul Oct 24, 2024
cc8faaf
fmt fixes
vaibhawvipul Oct 24, 2024
52041c4
fmt errors
vaibhawvipul Oct 24, 2024
616b793
fix clippy errors
vaibhawvipul Oct 24, 2024
bdcad60
fix clippy errors
vaibhawvipul Oct 24, 2024
74c4043
fix clippy
vaibhawvipul Oct 24, 2024
e342ed7
fmt fixes
vaibhawvipul Oct 24, 2024
167d3ac
adding a check for json only
vaibhawvipul Oct 24, 2024
ec5c023
removing double serialization and adding values using array builders
vaibhawvipul Oct 25, 2024
4f39469
taking kafka metadata from configs
vaibhawvipul Oct 25, 2024
dc59fb6
Merge branch 'ArroyoSystems:master' into kafka-metadata
vaibhawvipul Oct 25, 2024
dc59444
Merge branch 'kafka-metadata' of github.com:vaibhawvipul/arroyo into …
vaibhawvipul Oct 25, 2024
4f754a6
adding enable_metadata flag to deser
vaibhawvipul Oct 25, 2024
53b6b83
fmt and clippy fixes
vaibhawvipul Oct 25, 2024
fc8b8cf
fmt fixes
vaibhawvipul Oct 25, 2024
b4c4815
fmt fixes in planner
vaibhawvipul Oct 25, 2024
c5af394
fmt fixes in tables.rs
vaibhawvipul Oct 25, 2024
9cd7c04
clippy fixes
vaibhawvipul Oct 25, 2024
fca90ce
adding structs for better readability
vaibhawvipul Oct 27, 2024
111e67c
code refactor
vaibhawvipul Oct 27, 2024
efb3fbb
code reformatting to address generic functionality
vaibhawvipul Oct 28, 2024
00afc89
Merge branch 'master' into kafka-metadata
vaibhawvipul Oct 28, 2024
3f3fe61
bug fix
vaibhawvipul Oct 28, 2024
65ea896
remove clippy warn hints
vaibhawvipul Oct 28, 2024
737536a
adding validations during planning
vaibhawvipul Oct 28, 2024
6e047e8
checking metadata columns have only a metadata function in themC
vaibhawvipul Oct 29, 2024
c468d00
simplifying using if let in kafka connector
vaibhawvipul Oct 29, 2024
2dfe404
fix clippy errors
vaibhawvipul Oct 29, 2024
16e76ef
code reformating for if let deserializer
vaibhawvipul Oct 29, 2024
b3cd088
adding if let in deserialize_single fn
vaibhawvipul Oct 29, 2024
bcff0f0
code reformat
vaibhawvipul Oct 29, 2024
970c090
Merge branch 'master' into kafka-metadata
vaibhawvipul Oct 29, 2024
174835b
adding tests
vaibhawvipul Oct 29, 2024
77f6a90
adding validations for datatypes
vaibhawvipul Oct 29, 2024
66e996a
bug fix in test
vaibhawvipul Oct 29, 2024
fa2d089
adding validations for dataypes in connector metadata udf
vaibhawvipul Oct 30, 2024
c38bc8b
Merge branch 'master' into kafka-metadata
vaibhawvipul Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn compile_sql<'a>(
.unwrap_or(json!({})),
&table.config,
Some(&table.schema),
None,
)
.map_err(log_and_map)?;

Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-connectors/src/blackhole/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::blackhole::operator::BlackholeSinkFunc;
use anyhow::anyhow;
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{
Expand Down Expand Up @@ -78,8 +79,9 @@ impl Connector for BlackholeConnector {
_options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema)
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema, None)
}

fn from_config(
Expand All @@ -89,6 +91,7 @@ impl Connector for BlackholeConnector {
config: Self::ProfileT,
table: Self::TableT,
s: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = "Blackhole".to_string();

Expand All @@ -99,6 +102,7 @@ impl Connector for BlackholeConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
};

Ok(Connection {
Expand Down
7 changes: 5 additions & 2 deletions crates/arroyo-connectors/src/confluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::kafka::{
};
use crate::{kafka, pull_opt};
use anyhow::anyhow;
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{
Expand Down Expand Up @@ -161,6 +162,7 @@ impl Connector for ConfluentConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -172,7 +174,7 @@ impl Connector for ConfluentConnector {

let table = KafkaConnector::table_from_options(options)?;

self.from_config(None, name, connection, table, schema)
self.from_config(None, name, connection, table, schema, None)
}

fn from_config(
Expand All @@ -182,11 +184,12 @@ impl Connector for ConfluentConnector {
config: Self::ProfileT,
mut table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
table
.client_configs
.insert("client.id".to_string(), CLIENT_ID.to_string());
KafkaConnector {}.from_config(id, name, config.into(), table, schema)
KafkaConnector {}.from_config(id, name, config.into(), table, schema, None)
}

fn make_operator(
Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-connectors/src/filesystem/delta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_operator::connector::Connection;
use arroyo_storage::BackendConfig;
use std::collections::HashMap;
Expand Down Expand Up @@ -77,6 +78,7 @@ impl Connector for DeltaLakeConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<arroyo_operator::connector::Connection> {
let TableType::Sink {
write_path,
Expand Down Expand Up @@ -123,6 +125,7 @@ impl Connector for DeltaLakeConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
};

Ok(Connection {
Expand All @@ -142,10 +145,11 @@ impl Connector for DeltaLakeConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let table = file_system_sink_from_options(options, schema, CommitStyle::DeltaLake)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
self.from_config(None, name, EmptyConfig {}, table, schema, None)
}

fn make_operator(
Expand Down
7 changes: 6 additions & 1 deletion crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod sink;
mod source;

use anyhow::{anyhow, bail, Result};
use arrow::datatypes::DataType;
use arroyo_storage::BackendConfig;
use regex::Regex;
use std::collections::HashMap;
Expand Down Expand Up @@ -114,6 +115,7 @@ impl Connector for FileSystemConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (description, connection_type) = match table.table_type {
TableType::Source { .. } => ("FileSystem".to_string(), ConnectionType::Source),
Expand Down Expand Up @@ -168,6 +170,7 @@ impl Connector for FileSystemConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
};

Ok(Connection {
Expand All @@ -187,6 +190,7 @@ impl Connector for FileSystemConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
match options.remove("type") {
Some(t) if t == "source" => {
Expand All @@ -210,12 +214,13 @@ impl Connector for FileSystemConnector {
},
},
schema,
None,
)
}
Some(t) if t == "sink" => {
let table = file_system_sink_from_options(options, schema, CommitStyle::Direct)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
self.from_config(None, name, EmptyConfig {}, table, schema, None)
}
Some(t) => bail!("unknown type: {}", t),
None => bail!("must have type set"),
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl FileSystemSourceFunc {
line = line_reader.next() => {
match line.transpose()? {
Some(line) => {
ctx.deserialize_slice(line.as_bytes(), SystemTime::now()).await?;
ctx.deserialize_slice(line.as_bytes(), SystemTime::now(), None).await?;
records_read += 1;
if ctx.should_flush() {
ctx.flush_buffer().await?;
Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
Expand Down Expand Up @@ -88,6 +89,7 @@ impl Connector for FluvioConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let endpoint = options.remove("endpoint");
let topic = pull_opt("topic", options)?;
Expand Down Expand Up @@ -116,7 +118,7 @@ impl Connector for FluvioConnector {
type_: table_type,
};

Self::from_config(self, None, name, EmptyConfig {}, table, schema)
Self::from_config(self, None, name, EmptyConfig {}, table, schema, None)
}

fn from_config(
Expand All @@ -126,6 +128,7 @@ impl Connector for FluvioConnector {
config: EmptyConfig,
table: FluvioTable,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand Down Expand Up @@ -154,6 +157,7 @@ impl Connector for FluvioConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
};

Ok(Connection {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/fluvio/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl FluvioSourceFunc {
match message {
Some((_, Ok(msg))) => {
let timestamp = from_millis(msg.timestamp().max(0) as u64);
ctx.deserialize_slice(msg.value(), timestamp).await?;
ctx.deserialize_slice(msg.value(), timestamp, None).await?;

if ctx.should_flush() {
ctx.flush_buffer().await?;
Expand Down
5 changes: 5 additions & 0 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod operator;

use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::FieldType::Primitive;
Expand Down Expand Up @@ -101,6 +102,7 @@ impl Connector for ImpulseConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let event_rate = f64::from_str(&pull_opt("event_rate", options)?)
.map_err(|_| anyhow!("invalid value for event_rate; expected float"))?;
Expand Down Expand Up @@ -134,6 +136,7 @@ impl Connector for ImpulseConnector {
message_count,
},
None,
None,
)
}

Expand All @@ -144,6 +147,7 @@ impl Connector for ImpulseConnector {
config: Self::ProfileT,
table: Self::TableT,
_: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = format!(
"{}Impulse<{} eps{}>",
Expand All @@ -166,6 +170,7 @@ impl Connector for ImpulseConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
};

Ok(Connection {
Expand Down
50 changes: 46 additions & 4 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::de::ArrowDeserializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
Expand Down Expand Up @@ -188,6 +189,7 @@ impl Connector for KafkaConnector {
config: KafkaConfig,
table: KafkaTable,
schema: Option<&ConnectionSchema>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand All @@ -207,13 +209,21 @@ impl Connector for KafkaConnector {
.map(|t| t.to_owned())
.ok_or_else(|| anyhow!("'format' must be set for Kafka connection"))?;

let metadata_fields = metadata_fields.map(|fields| {
fields
.into_iter()
.map(|(k, (v, _))| (k, v))
.collect::<HashMap<String, String>>()
});

let config = OperatorConfig {
connection: serde_json::to_value(config).unwrap(),
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: metadata_fields,
};

Ok(Connection {
Expand Down Expand Up @@ -312,6 +322,7 @@ impl Connector for KafkaConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -323,7 +334,37 @@ impl Connector for KafkaConnector {

let table = Self::table_from_options(options)?;

Self::from_config(self, None, name, connection, table, schema)
let allowed_metadata_udf_args: HashMap<&str, DataType> = [
("offset_id", DataType::Int64),
("partition", DataType::Int32),
("topic", DataType::Utf8),
]
.iter()
.cloned()
.collect();

if let Some(fields) = &metadata_fields {
for (field_name, data_type) in fields.values() {
match allowed_metadata_udf_args.get(field_name.as_str()) {
Some(expected_type) => {
if expected_type != data_type {
return Err(anyhow!(
"Invalid datatype for metadata field '{}': expected '{:?}', found '{:?}'",
field_name, expected_type, data_type
));
}
}
None => {
return Err(anyhow!(
"Invalid metadata field name for Kafka connector: '{}'",
field_name
));
}
}
}
}

Self::from_config(self, None, name, connection, table, schema, metadata_fields)
}

fn make_operator(
Expand Down Expand Up @@ -383,6 +424,7 @@ impl Connector for KafkaConnector {
.unwrap_or(u32::MAX),
)
.unwrap(),
metadata_fields: config.additional_fields,
})))
}
TableType::Sink {
Expand Down Expand Up @@ -622,7 +664,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand All @@ -644,7 +686,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down Expand Up @@ -678,7 +720,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down
Loading
Loading