Skip to content

Commit

Permalink
feat: parse avro fields
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewinci committed Oct 12, 2022
1 parent 9a90aa7 commit 1af8f8e
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 66 deletions.
19 changes: 19 additions & 0 deletions src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ apache-avro = { version = "0.14.0" }
log = { version = "0.4" }
env_logger = { version = "0.9.0" }
async-trait = "0.1.57"
num-bigint = "0.4"
rust_decimal = "1.26"

[dev-dependencies]
httpmock = "0.6"
Expand Down
195 changes: 130 additions & 65 deletions src-tauri/src/lib/parser/avro_parser.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{ io::Cursor, sync::Arc };

use apache_avro::{ from_avro_datum, types::Value as AvroValue, Schema };
use serde_json::{ Map, Value as JsonValue };
use serde_json::{ Map, Value as JsonValue, json };
use num_bigint::{ BigInt };
use rust_decimal::{ Decimal };

use crate::lib::{ schema_registry::SchemaRegistryClient, error::{ Result, Error } };

Expand Down Expand Up @@ -53,17 +55,19 @@ fn get_schema_id(raw: &[u8]) -> Result<i32> {
fn map(a: &AvroValue) -> Result<JsonValue> {
match a {
AvroValue::Null => Ok(JsonValue::Null),
AvroValue::Boolean(v) => Ok(JsonValue::Bool(*v)),
AvroValue::Int(v) => Ok(JsonValue::Number(serde_json::Number::from(*v))),
AvroValue::Long(v) => Ok(JsonValue::Number(serde_json::Number::from(*v))),
AvroValue::Float(_v) => todo!(),
AvroValue::Double(_v) => todo!(),
AvroValue::Bytes(_) => todo!(),
AvroValue::String(v) => Ok(JsonValue::String(v.clone())),
AvroValue::Fixed(_, _) => todo!(),
AvroValue::Enum(_, _) => todo!(),
AvroValue::Union(_, v) => map(&**v),
AvroValue::Array(_) => todo!(),
AvroValue::Boolean(v) => Ok(json!(*v)),
AvroValue::Int(v) => Ok(json!(*v)),
AvroValue::Long(v) => Ok(json!(*v)),
AvroValue::Float(v) => Ok(json!(*v)),
AvroValue::Double(v) => Ok(json!(*v)),
AvroValue::String(v) => Ok(json!(*v)),
AvroValue::Array(v) => {
let mut json_vec = Vec::new();
for v in v.iter() {
json_vec.push(map(v)?);
}
Ok(JsonValue::Array(json_vec))
}
AvroValue::Map(vec) => {
//todo: DRY
let mut json_map = Map::new();
Expand All @@ -79,59 +83,120 @@ fn map(a: &AvroValue) -> Result<JsonValue> {
}
Ok(JsonValue::Object(json_map))
}
AvroValue::Date(_) => todo!(),
AvroValue::Decimal(_) => todo!(),
AvroValue::TimeMillis(_) => todo!(),
AvroValue::TimeMicros(_) => todo!(),
AvroValue::TimestampMillis(_) => todo!(),
AvroValue::TimestampMicros(_) => todo!(),
AvroValue::Duration(_) => todo!(),
AvroValue::Uuid(_) => todo!(),
AvroValue::Date(v) => Ok(json!(*v)),
AvroValue::TimeMillis(v) => Ok(json!(*v)),
AvroValue::TimeMicros(v) => Ok(json!(*v)),
AvroValue::TimestampMillis(v) => Ok(json!(*v)),
AvroValue::TimestampMicros(v) => Ok(json!(*v)),
AvroValue::Uuid(v) => Ok(json!(*v)),
//todo: WIP
AvroValue::Bytes(v) => Ok(json!(*v)), //todo: this should be like "\u00FF"
AvroValue::Decimal(v) => {
let arr = <Vec<u8>>::try_from(v).expect("Invalid decimal received");
let value = BigInt::from_signed_bytes_be(&arr);
//todo: need to inject the scale from the schema
let scale = 2;
let decimal = Decimal::new(i64::try_from(value).expect("Unable to cast to i64"), scale);
Ok(json!(decimal))
}
AvroValue::Duration(v) => {
//todo: check avro json representation
Ok(json!(format!("{:?} months {:?} days {:?} millis", v.months(), v.days(), v.millis())))
}
//todo: use avro-json format
AvroValue::Union(_, v) => map(&**v),
AvroValue::Enum(_, v) => Ok(json!(*v)),
AvroValue::Fixed(_, v) => Ok(json!(*v)), //todo: check representation in avro-json
}
}

// #[cfg(test)]
// mod tests {
// use apache_avro::{to_avro_datum, types::Record, Schema, Writer};

// use super::get_schema_id;

// #[test]
// fn poc_avro() {
// let raw_schema = r#"
// {
// "doc": "Sample schema to help you get started.",
// "fields": [
// {
// "doc": "The int type is a 32-bit signed integer.",
// "name": "my_field1",
// "type": "int"
// }
// ],
// "name": "sampleRecord",
// "namespace": "com.mycorp.mynamespace",
// "type": "record"
// }
// "#;
// let schema = Schema::parse_str(raw_schema).unwrap();
// let writer = Writer::new(&schema, Vec::new());
// let mut record = Record::new(writer.schema()).unwrap();
// record.put("my_field1", 123);
// let mut encoded = to_avro_datum(&schema, record).unwrap();
// // add 1 magic byte + 4 id bytes
// let mut with_header: Vec<u8> = vec![0x00, 0x00, 0x00, 0x00, 0x00];
// with_header.append(&mut encoded);
// //let res = parse_avro(&with_header[..], &schema).unwrap();
// // [0, 0, 1, 134, 197, 246, 1]
// // [0, 1, 134, 197] -> 0x01, 0x86, 0xC5 -> 100037
// // [0, 0, 0, 0, 246, 1]
// //assert_eq!(res, r#"{"my_field1":123}"#)
// }

// #[test]
// fn u8_array_to_i32() {
// let raw: Vec<u8> = vec![0x00, 0x00, 0x01, 0x86, 0xc5, 0x00, 0x00, 0x00];
// let id = get_schema_id(&raw).unwrap();
// assert_eq!(id, 100037)
// }
// }
#[cfg(test)]
mod tests {
use std::{ sync::Arc };

use apache_avro::{
to_avro_datum,
types::Record,
Schema as ApacheAvroSchema,
Writer,
types::Value as AvroValue,
Decimal,
};
use async_trait::async_trait;

use crate::lib::schema_registry::{ SchemaRegistryClient, Result, Schema };

use super::{ get_schema_id, AvroParser };
struct MockSchemaRegistry {
schema: String,
}

#[async_trait]
impl SchemaRegistryClient for MockSchemaRegistry {
async fn list_subjects(&self) -> Result<Vec<String>> {
todo!()
}
async fn get_schema(&self, _: String) -> Result<Vec<Schema>> {
todo!()
}
async fn get_schema_by_id(&self, _: i32) -> Result<String> {
Ok(self.schema.clone())
}
}
fn get_sut(schema: String) -> AvroParser {
AvroParser::new(Arc::new(MockSchemaRegistry { schema }))
}

#[tokio::test]
async fn test_simple_types_parsing() {
let raw_schema =
r#"
{
"fields": [
{ "name": "null_field", "type": "null" },
{ "name": "boolean_field", "type": "boolean" },
{ "name": "int_field", "type": "int" },
{ "name": "long_field", "type": "long" },
{ "name": "float_field", "type": "float" },
{ "name": "double_field", "type": "double" },
{ "name": "bytes_field", "type": "bytes" },
{ "name": "string_field", "type": "string" }
],
"name": "sampleRecord",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
"#;
let schema = ApacheAvroSchema::parse_str(raw_schema).unwrap();
let writer = Writer::new(&schema, Vec::new());
let mut record = Record::new(writer.schema()).unwrap();
record.put("null_field", AvroValue::Null);
record.put("boolean_field", true);
record.put("int_field", 12);
record.put("long_field", 12345667);
record.put("float_field", 123.123f32);
record.put("double_field", 12.12f64);
record.put("bytes_field", AvroValue::Bytes(vec![0x01, 0x02, 0xaa]));
record.put("string_field", "YO!! test");
let mut encoded = to_avro_datum(&schema, record).unwrap();
// add 1 magic byte + 4 id bytes
let mut raw: Vec<u8> = vec![0x00, 0x00, 0x00, 0x00, 0x00];
raw.append(&mut encoded);

let res = get_sut(raw_schema.to_string())
.parse_payload(&raw[..]).await
.unwrap();

assert_eq!(
res,
r#"{"boolean_field":true,"bytes_field":[1,2,170],"double_field":12.12,"float_field":123.12300109863281,"int_field":12,"long_field":12345667,"null_field":null,"string_field":"YO!! test"}"#
)
}

#[test]
fn u8_array_to_i32() {
let raw: Vec<u8> = vec![0x00, 0x00, 0x01, 0x86, 0xc5, 0x00, 0x00, 0x00];
let id = get_schema_id(&raw).unwrap();
assert_eq!(id, 100037)
}
}
2 changes: 1 addition & 1 deletion src-tauri/src/lib/schema_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ mod error;
mod types;

pub use client::SchemaRegistryClient;
pub use error::SchemaRegistryError;
pub use error::{ SchemaRegistryError, Result };
pub use types::{ BasicAuth, Schema };
pub use client::CachedSchemaRegistry;

0 comments on commit 1af8f8e

Please sign in to comment.