Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro codec enhancements + Avro Reader #6965

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7d4e6fd
Added basic support for arrow -> avro codec along with beginnings of …
jecsand838 Dec 28, 2024
36d56a9
Added codec support + tests for:
jecsand838 Dec 30, 2024
36b4b73
Added reader record decoder support for non-null Enum, Map, and Decim…
jecsand838 Dec 31, 2024
9d0bf4c
Added reader record decoder support for non-null Enum, Map, and Decim…
jecsand838 Dec 31, 2024
082581a
Added reader record decoder support for non-null Enum, Map, and Decim…
jecsand838 Dec 31, 2024
6647b25
Added null support
jecsand838 Dec 31, 2024
9cfda09
* Reader decoder Support for nullable types.
jecsand838 Dec 31, 2024
84ffb62
* Minor Cleanup
jecsand838 Dec 31, 2024
8600680
* Added record decoder support for the following types:
jecsand838 Dec 31, 2024
2792037
Merge pull request #1 from elastiflow/avro-codec-improvements
svencowart Jan 3, 2025
3029efc
Merge branch 'apache:main' into avro-codec
jecsand838 Jan 3, 2025
01e5224
Merge branch 'apache:main' into avro-codec
jecsand838 Jan 6, 2025
8331058
Minor cleanup
jecsand838 Jan 6, 2025
c54de48
chore: import cleanup and formatting
svencowart Jan 7, 2025
fc696c8
chore: simplifies and cleans code
svencowart Jan 7, 2025
81d7bba
ran linter
jecsand838 Jan 10, 2025
03bd5f0
Merge branch 'apache:main' into avro-codec
jecsand838 Jan 10, 2025
eeabc49
Merge branch 'apache:main' into avro-codec
jecsand838 Jan 13, 2025
1bc9a51
Removed Avro writer module and Avro writer related logic from codec.rs
jecsand838 Jan 13, 2025
b157584
chore: clean up UUIDs and Durations
svencowart Jan 14, 2025
8d0fe77
chore: clean up UUIDs Durations
svencowart Jan 14, 2025
0496dcf
add aliases to record fields
svencowart Jan 14, 2025
7208f78
Merge branch 'apache:main' into avro-codec
jecsand838 Jan 14, 2025
92e105f
* Fixed size issue in codec.rs fixed type decimal
jecsand838 Jan 15, 2025
51586d8
* Removed Avro writer specific `maybe_add_namespace` function from co…
jecsand838 Jan 17, 2025
a51b202
Aligns mapping between avro and codec and tests all paths of arrow_fi…
svencowart Jan 17, 2025
5585a87
* Organized Types to match Avro spec ordering
jecsand838 Jan 18, 2025
26faf5b
* Fixed failing `test_enum_decoding` test due to constructing Codec:…
jecsand838 Jan 18, 2025
cf45fcf
* Optimized avro codec.rs data_type Self::Decimal case:
jecsand838 Jan 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: clean up UUIDs and Durations
svencowart committed Jan 14, 2025
commit b1575847a95b88bf12025eea9762822db7a2c7cf
35 changes: 21 additions & 14 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
@@ -155,6 +155,8 @@ pub enum Codec {
Float64,
Binary,
Utf8,
Decimal(usize, Option<usize>, Option<usize>),
Uuid,
Date32,
TimeMillis,
TimeMicros,
@@ -165,11 +167,10 @@ pub enum Codec {
Fixed(i32),
List(Arc<AvroDataType>),
Struct(Arc<[AvroField]>),
Interval,
Duration,
/// In Arrow, use Dictionary(Int32, Utf8) for Enum.
Enum(Vec<String>),
Map(Arc<AvroDataType>),
Decimal(usize, Option<usize>, Option<usize>),
}

impl Codec {
@@ -184,6 +185,18 @@ impl Codec {
Self::Float64 => Float64,
Self::Binary => Binary,
Self::Utf8 => Utf8,
Self::Decimal(precision, scale, size) => match size {
Some(s) if *s > 16 => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
Some(s) => Decimal128(*precision as u8, scale.unwrap_or(0) as i8),
None if *precision <= DECIMAL128_MAX_PRECISION as usize
&& scale.unwrap_or(0) <= DECIMAL128_MAX_SCALE as usize =>
{
Decimal128(*precision as u8, scale.unwrap_or(0) as i8)
}
_ => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
},
// arrow-rs does not support the UUID Canonical Extension Type yet, so this is a temporary workaround.
Self::Uuid => FixedSizeBinary(16),
Self::Date32 => Date32,
Self::TimeMillis => Time32(TimeUnit::Millisecond),
Self::TimeMicros => Time64(TimeUnit::Microsecond),
@@ -193,7 +206,7 @@ impl Codec {
Self::TimestampMicros(is_utc) => {
Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
}
Self::Interval => Interval(IntervalUnit::MonthDayNano),
Self::Duration => Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => FixedSizeBinary(*size),
Self::List(f) => List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME))),
Self::Struct(f) => Struct(f.iter().map(|x| x.field()).collect()),
@@ -212,16 +225,6 @@ impl Codec {
)),
false,
),
Self::Decimal(precision, scale, size) => match size {
Some(s) if *s > 16 => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
Some(s) => Decimal128(*precision as u8, scale.unwrap_or(0) as i8),
None if *precision <= DECIMAL128_MAX_PRECISION as usize
&& scale.unwrap_or(0) <= DECIMAL128_MAX_SCALE as usize =>
{
Decimal128(*precision as u8, scale.unwrap_or(0) as i8)
}
_ => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
},
}
}
}
@@ -450,6 +453,7 @@ fn make_data_type<'a>(
None,
);
}
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
(Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
(Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
@@ -461,7 +465,7 @@ fn make_data_type<'a>(
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
*c = Codec::TimestampMicros(false)
}
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Duration,
(Some(logical), _) => {
// Insert unrecognized logical type into metadata
field.metadata.insert("logicalType".into(), logical.into());
@@ -510,6 +514,9 @@ fn arrow_type_to_codec(dt: &DataType) -> Codec {
Float64 => Codec::Float64,
Utf8 => Codec::Utf8,
Binary | LargeBinary => Codec::Binary,
// arrow-rs does not support the UUID Canonical Extension Type yet, so this mapping is not possible.
// It is unsafe to assume all FixedSizeBinary(16) are UUIDs.
// Uuid => Codec::Uuid,
Date32 => Codec::Date32,
Time32(TimeUnit::Millisecond) => Codec::TimeMillis,
Time64(TimeUnit::Microsecond) => Codec::TimeMicros,
6 changes: 3 additions & 3 deletions arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
@@ -169,7 +169,7 @@ impl Decoder {
Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
}
Codec::Fixed(n) => Self::Fixed(*n, Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::Interval => Self::Interval(Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::Duration => Self::Interval(Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::List(item) => {
let item_decoder = Box::new(Self::try_new(item)?);
Self::List(
@@ -866,7 +866,7 @@ mod tests {
fn test_interval_decoding() {
// Avro interval => 12 bytes => [ months i32, days i32, ms i32 ]
// decode 2 rows => row1 => months=1, days=2, ms=100 => row2 => months=-1, days=10, ms=9999
let dt = AvroDataType::from_codec(Codec::Interval);
let dt = AvroDataType::from_codec(Codec::Duration);
let mut dec = Decoder::try_new(&dt).unwrap();
// row1 => months=1 => 01,00,00,00, days=2 => 02,00,00,00, ms=100 => 64,00,00,00
// row2 => months=-1 => 0xFF,0xFF,0xFF,0xFF, days=10 => 0x0A,0x00,0x00,0x00, ms=9999 => 0x0F,0x27,0x00,0x00
@@ -903,7 +903,7 @@ mod tests {
#[test]
fn test_interval_decoding_with_nulls() {
// Avro union => [ interval, null]
let dt = AvroDataType::from_codec(Codec::Interval);
let dt = AvroDataType::from_codec(Codec::Duration);
let child = Decoder::try_new(&dt).unwrap();
let mut dec = Decoder::Nullable(
Nullability::NullFirst,