Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jdarais committed Mar 2, 2025
1 parent 1749ceb commit e2b0d22
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 107 deletions.
156 changes: 99 additions & 57 deletions avro/benches/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const RAW_SMALL_SCHEMA: &str = r#"

#[derive(Serialize, Clone)]
struct SmallRecord {
field: String
field: String,
}

const RAW_BIG_SCHEMA: &str = r#"
Expand Down Expand Up @@ -112,20 +112,20 @@ const RAW_BIG_SCHEMA: &str = r#"

#[derive(Serialize, Clone)]
struct MailingAddress {
street: String,
city: String,
state_prov: String,
country: String,
zip: String
street: String,
city: String,
state_prov: String,
country: String,
zip: String,
}

#[derive(Serialize, Clone)]
struct BigRecord {
username: String,
age: i32,
phone: String,
housenum: String,
address: MailingAddress
username: String,
age: i32,
phone: String,
housenum: String,
address: MailingAddress,
}

const RAW_ADDRESS_SCHEMA: &str = r#"
Expand Down Expand Up @@ -173,11 +173,11 @@ fn make_small_record() -> anyhow::Result<(Schema, Value)> {
}

fn make_small_record_ser() -> anyhow::Result<(Schema, SmallRecord)> {
let small_schema = Schema::parse_str(RAW_SMALL_SCHEMA)?;
let small_record = SmallRecord {
field: String::from("foo")
};
Ok((small_schema, small_record))
let small_schema = Schema::parse_str(RAW_SMALL_SCHEMA)?;
let small_record = SmallRecord {
field: String::from("foo"),
};
Ok((small_schema, small_record))
}

fn make_big_record() -> anyhow::Result<(Schema, Value)> {
Expand All @@ -204,29 +204,29 @@ fn make_big_record() -> anyhow::Result<(Schema, Value)> {
}

fn make_big_record_ser() -> anyhow::Result<(Schema, BigRecord)> {
let big_schema = Schema::parse_str(RAW_BIG_SCHEMA)?;
let big_record = BigRecord {
username: String::from("username"),
age: 10,
phone: String::from("000000000"),
housenum: String::from("0000"),
address: MailingAddress {
street: String::from("street"),
city: String::from("city"),
state_prov: String::from("state_prov"),
country: String::from("country"),
zip: String::from("zip")
}
};
Ok((big_schema, big_record))
let big_schema = Schema::parse_str(RAW_BIG_SCHEMA)?;
let big_record = BigRecord {
username: String::from("username"),
age: 10,
phone: String::from("000000000"),
housenum: String::from("0000"),
address: MailingAddress {
street: String::from("street"),
city: String::from("city"),
state_prov: String::from("state_prov"),
country: String::from("country"),
zip: String::from("zip"),
},
};
Ok((big_schema, big_record))
}

fn make_records(record: Value, count: usize) -> Vec<Value> {
std::iter::repeat(record).take(count).collect()
}

fn make_records_ser<T: Serialize + Clone>(record: T, count: usize) -> Vec<T> {
std::iter::repeat(record).take(count).collect()
std::iter::repeat(record).take(count).collect()
}

fn write(schema: &Schema, records: &[Value]) -> AvroResult<Vec<u8>> {
Expand All @@ -236,9 +236,9 @@ fn write(schema: &Schema, records: &[Value]) -> AvroResult<Vec<u8>> {
}

fn write_ser<T: Serialize>(schema: &Schema, records: &[T]) -> AvroResult<Vec<u8>> {
let mut writer = Writer::new(schema, Vec::new());
writer.extend_ser(records).unwrap();
writer.into_inner()
let mut writer = Writer::new(schema, Vec::new());
writer.extend_ser(records).unwrap();
writer.into_inner()
}

fn read(schema: &Schema, bytes: &[u8]) -> anyhow::Result<()> {
Expand Down Expand Up @@ -272,15 +272,15 @@ fn bench_write(
}

fn bench_write_ser<T: Serialize + Clone>(
c: &mut Criterion,
make_record: impl Fn() -> anyhow::Result<(Schema, T)>,
n_records: usize,
name: &str
c: &mut Criterion,
make_record: impl Fn() -> anyhow::Result<(Schema, T)>,
n_records: usize,
name: &str,
) -> anyhow::Result<()> {
let (schema, record) = make_record()?;
let records = make_records_ser(record, n_records);
c.bench_function(name, |b| b.iter(|| write_ser(&schema, &records)));
Ok(())
let (schema, record) = make_record()?;
let records = make_records_ser(record, n_records);
c.bench_function(name, |b| b.iter(|| write_ser(&schema, &records)));
Ok(())
}

fn bench_read(
Expand All @@ -307,15 +307,27 @@ fn bench_small_schema_write_1_record(c: &mut Criterion) {
}

fn bench_small_schema_write_1_record_ser(c: &mut Criterion) {
bench_write_ser(c, make_small_record_ser, 1, "small schema, write 1 record (serde way)").unwrap();
bench_write_ser(
c,
make_small_record_ser,
1,
"small schema, write 1 record (serde way)",
)
.unwrap();
}

fn bench_small_schema_write_100_record(c: &mut Criterion) {
bench_write(c, make_small_record, 100, "small schema, write 100 records").unwrap();
}

fn bench_small_schema_write_100_record_ser(c: &mut Criterion) {
bench_write_ser(c, make_small_record_ser, 100, "small schema, write 100 records (serde way)").unwrap();
bench_write_ser(
c,
make_small_record_ser,
100,
"small schema, write 100 records (serde way)",
)
.unwrap();
}

fn bench_small_schema_write_10_000_record(c: &mut Criterion) {
Expand All @@ -329,7 +341,13 @@ fn bench_small_schema_write_10_000_record(c: &mut Criterion) {
}

fn bench_small_schema_write_10_000_record_ser(c: &mut Criterion) {
bench_write_ser(c, make_small_record_ser, 10_000, "small schema, write 10k records (serde way)").unwrap()
bench_write_ser(
c,
make_small_record_ser,
10_000,
"small schema, write 10k records (serde way)",
)
.unwrap()
}

fn bench_small_schema_read_1_record(c: &mut Criterion) {
Expand All @@ -355,23 +373,41 @@ fn bench_big_schema_write_1_record(c: &mut Criterion) {
}

fn bench_big_schema_write_1_record_ser(c: &mut Criterion) {
bench_write_ser(c, make_big_record_ser, 1, "big schema, write 1 record (serde way)").unwrap();
bench_write_ser(
c,
make_big_record_ser,
1,
"big schema, write 1 record (serde way)",
)
.unwrap();
}

fn bench_big_schema_write_100_record(c: &mut Criterion) {
bench_write(c, make_big_record, 100, "big schema, write 100 records").unwrap();
bench_write(c, make_big_record, 100, "big schema, write 100 records").unwrap();
}

fn bench_big_schema_write_100_record_ser(c: &mut Criterion) {
bench_write_ser(c, make_big_record_ser, 100, "big schema, write 100 records (serde way)").unwrap();
bench_write_ser(
c,
make_big_record_ser,
100,
"big schema, write 100 records (serde way)",
)
.unwrap();
}

fn bench_big_schema_write_10_000_record(c: &mut Criterion) {
bench_write(c, make_big_record, 10_000, "big schema, write 10k records").unwrap();
bench_write(c, make_big_record, 10_000, "big schema, write 10k records").unwrap();
}

fn bench_big_schema_write_10_000_record_ser(c: &mut Criterion) {
bench_write_ser(c, make_big_record_ser, 10_000, "big scheam, write 10k records (serde way)").unwrap();
bench_write_ser(
c,
make_big_record_ser,
10_000,
"big scheam, write 10k records (serde way)",
)
.unwrap();
}

fn bench_big_schema_read_1_record(c: &mut Criterion) {
Expand Down Expand Up @@ -411,11 +447,11 @@ criterion_group!(
);

criterion_group!(
benches_ser,
bench_small_schema_write_1_record_ser,
bench_small_schema_write_100_record_ser,
bench_big_schema_write_1_record_ser,
bench_big_schema_write_100_record_ser,
benches_ser,
bench_small_schema_write_1_record_ser,
bench_small_schema_write_100_record_ser,
bench_big_schema_write_1_record_ser,
bench_big_schema_write_100_record_ser,
);

criterion_group!(
Expand Down Expand Up @@ -444,4 +480,10 @@ criterion_group!(
bench_big_schema_read_100_000_record,
);

criterion_main!(benches, benches_ser, long_benches, long_benches_ser, very_long_benches);
criterion_main!(
benches,
benches_ser,
long_benches,
long_benches_ser,
very_long_benches
);
7 changes: 4 additions & 3 deletions avro/src/bigdecimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ pub(crate) fn big_decimal_as_bytes(decimal: &BigDecimal) -> Vec<u8> {
let mut buffer: Vec<u8> = Vec::new();
let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
encode_bytes(&big_endian_value, &mut buffer);
encode_long(exponent, &mut buffer);
encode_bytes(&big_endian_value, &mut buffer)
.expect("Writing to a Vec<u8> expected to not fail");
encode_long(exponent, &mut buffer).expect("Writing to a Vec<u8> expected to not fail");

buffer
}
Expand All @@ -41,7 +42,7 @@ pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> Vec<u8> {

// encode global size and content
let mut final_buffer: Vec<u8> = Vec::new();
encode_bytes(&buffer, &mut final_buffer);
encode_bytes(&buffer, &mut final_buffer).expect("Writing to a Vec<u8> expected to not fail");
final_buffer
}

Expand Down
41 changes: 29 additions & 12 deletions avro/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Logic for serde-compatible deserialization.
use crate::{bytes::DE_BYTES_BORROWED, types::Value, Error};
use serde::{
de::{self, DeserializeSeed, Visitor, Deserializer as _},
de::{self, DeserializeSeed, Deserializer as _, Visitor},
forward_to_deserialize_any, Deserialize,
};
use std::{
Expand Down Expand Up @@ -57,7 +57,7 @@ pub struct EnumDeserializer<'de> {

struct UnionDeserializer<'de> {
input: &'static str,
value: &'de Value
value: &'de Value,
}

impl<'de> Deserializer<'de> {
Expand Down Expand Up @@ -247,8 +247,14 @@ impl<'de> de::EnumAccess<'de> for UnionDeserializer<'de> {

fn variant_seed<V>(self, seed: V) -> Result<(V::Value, Self::Variant), Self::Error>
where
V: DeserializeSeed<'de> {
Ok((seed.deserialize(StringDeserializer { input: String::from(self.input) })?, self))
V: DeserializeSeed<'de>,
{
Ok((
seed.deserialize(StringDeserializer {
input: String::from(self.input),
})?,
self,
))
}
}

Expand All @@ -258,19 +264,21 @@ impl<'de> de::VariantAccess<'de> for UnionDeserializer<'de> {
fn unit_variant(self) -> Result<(), Self::Error> {
match self.value {
Value::Null => Ok(()),
_ => Err(Error::GetNull(self.value.clone()))
_ => Err(Error::GetNull(self.value.clone())),
}
}

fn newtype_variant_seed<T>(self, seed: T) -> Result<T::Value, Self::Error>
where
T: DeserializeSeed<'de> {
T: DeserializeSeed<'de>,
{
seed.deserialize(&Deserializer::new(self.value))
}

fn tuple_variant<V>(self, len: usize, visitor: V) -> Result<V::Value, Self::Error>
where
V: Visitor<'de> {
V: Visitor<'de>,
{
Deserializer::new(self.value).deserialize_tuple(len, visitor)
}

Expand All @@ -280,7 +288,8 @@ impl<'de> de::VariantAccess<'de> for UnionDeserializer<'de> {
visitor: V,
) -> Result<V::Value, Self::Error>
where
V: Visitor<'de> {
V: Visitor<'de>,
{
let des = Deserializer::new(self.value);
des.deserialize_struct(self.input, fields, visitor)
}
Expand Down Expand Up @@ -593,10 +602,18 @@ impl<'de> de::Deserializer<'de> for &Deserializer<'de> {
// This branch can be anything...
Value::Record(ref fields) => visitor.visit_enum(EnumDeserializer::new(fields)),
Value::String(ref field) => visitor.visit_enum(EnumUnitDeserializer::new(field)),
Value::Union(idx, ref inner) => if (idx as usize) < variants.len() {
visitor.visit_enum(UnionDeserializer::new(variants[idx as usize], inner.as_ref()))
} else {
Err(Error::GetUnionVariant { index: idx as i64, num_variants: variants.len() })
Value::Union(idx, ref inner) => {
if (idx as usize) < variants.len() {
visitor.visit_enum(UnionDeserializer::new(
variants[idx as usize],
inner.as_ref(),
))
} else {
Err(Error::GetUnionVariant {
index: idx as i64,
num_variants: variants.len(),
})
}
}
// This has to be a unit Enum
Value::Enum(_index, ref field) => visitor.visit_enum(EnumUnitDeserializer::new(field)),
Expand Down
2 changes: 1 addition & 1 deletion avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(

// use a Vec to be able re-read the bytes more than once if needed
let mut reader = Vec::with_capacity(len + 1);
encode_long(len as i64, &mut reader);
encode_long(len as i64, &mut reader)?;
reader.extend_from_slice(&bytes);

let decode_from_string = |reader| match decode_internal(
Expand Down
Loading

0 comments on commit e2b0d22

Please sign in to comment.