Skip to content

Commit

Permalink
configuration and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jan 12, 2025
1 parent 7388440 commit bcf7d54
Show file tree
Hide file tree
Showing 19 changed files with 219 additions and 41 deletions.
96 changes: 89 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/filesystem/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl FileSystemSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);
let parallelism = ctx.task_info.parallelism;
let task_index = ctx.task_info.task_index;
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/fluvio/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl SourceOperator for FluvioSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

match self.run_int(ctx, collector).await {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl KafkaSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&self.metadata_fields,
);
}

Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/kinesis/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl SourceOperator for KinesisSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

match self.run_int(ctx, collector).await {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/mqtt/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl MqttSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&self.metadata_fields,
);

if ctx.task_info.task_index > 0 {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/nats/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ impl NatsSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

let nats_client = get_nats_client(&self.connection)
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/polling_http/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl PollingHttpSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

// since there's no way to partition across an http source, only read on the first task
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/rabbitmq/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl SourceOperator for RabbitmqStreamSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

match self.run_int(ctx, collector).await {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/single_file/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl SourceOperator for SingleFileSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

let state: &mut arroyo_state::tables::global_keyed_map::GlobalKeyedView<String, usize> =
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/sse/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl SSESourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

let mut client = eventsource_client::ClientBuilder::for_url(&self.url).unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/websocket/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl SourceOperator for WebsocketSourceFunc {
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
&[],
);

match self.run_int(ctx, collector).await {
Expand Down
29 changes: 21 additions & 8 deletions crates/arroyo-formats/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl ArrowDeserializer {
pub fn new(
format: Format,
schema: Arc<ArroyoSchema>,
metadata_fields: &[MetadataField],
framing: Option<Framing>,
bad_data: BadData,
) -> Self {
Expand All @@ -278,7 +279,7 @@ impl ArrowDeserializer {
Arc::new(FailingSchemaResolver::new()) as Arc<dyn SchemaResolver + Sync>
};

Self::with_schema_resolver(format, framing, schema, &[], bad_data, resolver)
Self::with_schema_resolver(format, framing, schema, metadata_fields, bad_data, resolver)
}

pub fn with_schema_resolver(
Expand Down Expand Up @@ -729,12 +730,13 @@ mod tests {
use arrow::datatypes::Int32Type;
use arrow_array::cast::AsArray;
use arrow_array::types::{GenericBinaryType, Int64Type, TimestampNanosecondType};
use arrow_schema::{Schema, TimeUnit};
use arrow_schema::{DataType, Schema, TimeUnit};
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{
BadData, Format, Framing, FramingMethod, JsonFormat, NewlineDelimitedFraming,
RawBytesFormat,
};
use arroyo_rpc::MetadataField;
use arroyo_types::{to_nanos, SourceError};
use serde_json::json;
use std::sync::Arc;
Expand Down Expand Up @@ -832,6 +834,7 @@ mod tests {
timestamp_format: Default::default(),
}),
schema,
&[],
None,
bad_data,
)
Expand Down Expand Up @@ -913,6 +916,7 @@ mod tests {
let mut deserializer = ArrowDeserializer::new(
Format::RawBytes(RawBytesFormat {}),
arroyo_schema,
&[],
None,
BadData::Fail {},
);
Expand Down Expand Up @@ -941,7 +945,7 @@ mod tests {
}

#[tokio::test]
async fn test_additional_fields_deserialisation() {
async fn test_additional_fields_deserialization() {
let schema = Arc::new(Schema::new(vec![
arrow_schema::Field::new("x", arrow_schema::DataType::Int64, true),
arrow_schema::Field::new("y", arrow_schema::DataType::Int32, true),
Expand All @@ -965,17 +969,26 @@ mod tests {
timestamp_format: Default::default(),
}),
arroyo_schema,
&[
MetadataField {
field_name: "y".to_string(),
key: "y".to_string(),
data_type: Some(DataType::Int64),
},
MetadataField {
field_name: "z".to_string(),
key: "z".to_string(),
data_type: Some(DataType::Utf8),
},
],
None,
BadData::Drop {},
);

let time = SystemTime::now();
let mut additional_fields = std::collections::HashMap::new();
let binding = "y".to_string();
additional_fields.insert(binding.as_str(), FieldValueType::Int32(5));
let z_value = "hello".to_string();
let binding = "z".to_string();
additional_fields.insert(binding.as_str(), FieldValueType::String(&z_value));
additional_fields.insert("y", FieldValueType::Int32(5));
additional_fields.insert("z", FieldValueType::String("hello"));

let result = deserializer
.deserialize_slice(
Expand Down
2 changes: 2 additions & 0 deletions crates/arroyo-operator/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl SourceCollector {
format: Format,
framing: Option<Framing>,
bad_data: Option<BadData>,
metadata_fields: &[MetadataField],
) {
if self.deserializer.is_some() {
panic!("Deserialize already initialized");
Expand All @@ -328,6 +329,7 @@ impl SourceCollector {
self.deserializer = Some(ArrowDeserializer::new(
format,
self.out_schema.clone(),
metadata_fields,
framing,
bad_data.unwrap_or_default(),
));
Expand Down
Loading

0 comments on commit bcf7d54

Please sign in to comment.