From 8600680df4f75b5a937a5fc0ffae2f743705379b Mon Sep 17 00:00:00 2001 From: Connor Sanders Date: Tue, 31 Dec 2024 17:26:36 -0600 Subject: [PATCH] * Added record decoder support for the following types: - Fixed - Interval Signed-off-by: Connor Sanders --- arrow-avro/src/reader/record.rs | 381 +++++++++++++++++++++++--------- 1 file changed, 278 insertions(+), 103 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 500fe27fd53..87ae7e2426a 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -20,13 +20,13 @@ use crate::reader::block::{Block, BlockDecoder}; use crate::reader::cursor::AvroCursor; use crate::reader::header::Header; use crate::schema::*; +use arrow_array::builder::{Decimal128Builder, Decimal256Builder, PrimitiveBuilder}; use arrow_array::types::*; use arrow_array::*; -use arrow_array::builder::{Decimal128Builder, Decimal256Builder}; use arrow_buffer::*; use arrow_schema::{ - ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as ArrowSchema, SchemaRef, - TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, + ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit, Schema as ArrowSchema, + SchemaRef, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, }; use std::collections::HashMap; use std::io::Read; @@ -87,8 +87,6 @@ impl RecordDecoder { } /// Decoder for Avro data of various shapes. -/// -/// This is the “internal” representation used by [`RecordDecoder`]. #[derive(Debug)] enum Decoder { /// Avro `null` @@ -117,25 +115,19 @@ enum Decoder { Binary(OffsetBufferBuilder, Vec), /// Avro `string` => Arrow String String(OffsetBufferBuilder, Vec), + /// Avro `fixed(n)` => Arrow `FixedSizeBinaryArray` + Fixed(i32, Vec), + /// Avro `interval` => Arrow `IntervalMonthDayNanoType` (12 bytes) + Interval(Vec), /// Avro `array` - /// * `FieldRef` is the arrow field for the list - /// * `OffsetBufferBuilder` holds offsets into the child array - /// * The boxed `Decoder` decodes T itself List(FieldRef, OffsetBufferBuilder, Box), /// Avro `record` - /// * `Fields` is the Arrow schema of the record - /// * The `Vec` is one decoder per child field Record(Fields, Vec), - /// Avro union that includes `null` => decodes as a single arrow field + a null bit mask + /// Avro union that includes `null` Nullable(Nullability, NullBufferBuilder, Box), /// Avro `enum` => Dictionary(int32 -> string) Enum(Vec, Vec), /// Avro `map` - /// * The `FieldRef` is the arrow field for the map - /// * `key_offsets`, `map_offsets`: offset builders - /// * `key_data` accumulates the raw UTF8 for keys - /// * `values_decoder_inner` decodes the map’s value type - /// * `current_entry_count` how many (key,value) pairs total seen so far Map( FieldRef, OffsetBufferBuilder, @@ -145,19 +137,17 @@ enum Decoder { usize, ), /// Avro decimal => Arrow decimal - /// (precision, scale, size, builder) Decimal(usize, Option, Option, DecimalBuilder), } impl Decoder { - /// Checks if the Decoder is nullable, i.e. wrapped in [`Decoder::Nullable`]. + /// Checks if the Decoder is nullable, i.e. wrapped in `Nullable`. fn is_nullable(&self) -> bool { matches!(self, Decoder::Nullable(_, _, _)) } /// Create a `Decoder` from an [`AvroDataType`]. fn try_new(data_type: &AvroDataType) -> Result { - let not_implemented = |s: &str| Err(ArrowError::NotYetImplemented(s.to_string())); let decoder = match data_type.codec() { Codec::Null => Decoder::Null(0), Codec::Boolean => Decoder::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)), @@ -182,8 +172,8 @@ impl Decoder { Codec::TimestampMicros(is_utc) => { Decoder::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY)) } - Codec::Fixed(_) => return not_implemented("decoding Avro fixed-typed data"), - Codec::Interval => return not_implemented("decoding Avro interval"), + Codec::Fixed(n) => Decoder::Fixed(*n, Vec::with_capacity(DEFAULT_CAPACITY)), + Codec::Interval => Decoder::Interval(Vec::with_capacity(DEFAULT_CAPACITY)), Codec::List(item) => { let item_decoder = Box::new(Self::try_new(item)?); Decoder::List( @@ -192,17 +182,19 @@ impl Decoder { item_decoder, ) } - Codec::Struct(fields) => { - let mut arrow_fields = Vec::with_capacity(fields.len()); - let mut decoders = Vec::with_capacity(fields.len()); - for avro_field in fields.iter() { + Codec::Struct(avro_fields) => { + let mut arrow_fields = Vec::with_capacity(avro_fields.len()); + let mut decoders = Vec::with_capacity(avro_fields.len()); + for avro_field in avro_fields.iter() { let d = Self::try_new(avro_field.data_type())?; arrow_fields.push(avro_field.field()); decoders.push(d); } Decoder::Record(arrow_fields.into(), decoders) } - Codec::Enum(symbols) => Decoder::Enum(symbols.clone(), Vec::with_capacity(DEFAULT_CAPACITY)), + Codec::Enum(symbols) => { + Decoder::Enum(symbols.clone(), Vec::with_capacity(DEFAULT_CAPACITY)) + } Codec::Map(value_type) => { let map_field = Arc::new(ArrowField::new( "entries", @@ -226,6 +218,8 @@ impl Decoder { Decoder::Decimal(*precision, *scale, *size, builder) } }; + + // Wrap in Nullable if needed match data_type.nullability() { Some(nb) => Ok(Decoder::Nullable( nb, @@ -237,8 +231,6 @@ impl Decoder { } /// Append a null to this decoder. - /// - /// This must keep the “row counts” in sync across child buffers, etc. fn append_null(&mut self) { match self { Decoder::Null(n) => { @@ -265,6 +257,19 @@ impl Decoder { Decoder::Binary(off, _) | Decoder::String(off, _) => { off.push_length(0); } + Decoder::Fixed(fsize, buf) => { + // For a null, push `fsize` zeroed bytes + let n = *fsize as usize; + buf.extend(std::iter::repeat(0u8).take(n)); + } + Decoder::Interval(intervals) => { + // null => store a 12-byte zero => months=0, days=0, nanos=0 + intervals.push(IntervalMonthDayNano { + months: 0, + days: 0, + nanoseconds: 0, + }); + } Decoder::List(_, off, child) => { off.push_length(0); child.append_null(); @@ -277,58 +282,82 @@ impl Decoder { Decoder::Enum(_, indices) => { indices.push(0); } - Decoder::Map(_, key_off, map_off, _, _, entry_count) => { + Decoder::Map( + _, + key_off, + map_off, + _, + _, + entry_count, + ) => { key_off.push_length(0); map_off.push_length(*entry_count); } Decoder::Decimal(_, _, _, builder) => { let _ = builder.append_null(); } - Decoder::Nullable(_, _, _) => { /* The null mask is handled by the outer decoder */ } + Decoder::Nullable(_, _, _) => { /* The null bit is stored in the NullBufferBuilder */ } } } - /// Decode a single “row” of data from `buf`. + /// Decode a single row of data from `buf`. fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> { match self { - Decoder::Null(n) => { - *n += 1; + Decoder::Null(count) => { + *count += 1; } - Decoder::Boolean(vals) => { - vals.append(buf.get_bool()?); + Decoder::Boolean(values) => { + values.append(buf.get_bool()?); } - Decoder::Int32(vals) => { - vals.push(buf.get_int()?); + Decoder::Int32(values) => { + values.push(buf.get_int()?); } - Decoder::Date32(vals) => { - vals.push(buf.get_int()?); + Decoder::Date32(values) => { + values.push(buf.get_int()?); } - Decoder::Int64(vals) => { - vals.push(buf.get_long()?); + Decoder::Int64(values) => { + values.push(buf.get_long()?); } - Decoder::TimeMillis(vals) => { - vals.push(buf.get_int()?); + Decoder::TimeMillis(values) => { + values.push(buf.get_int()?); } - Decoder::TimeMicros(vals) => { - vals.push(buf.get_long()?); + Decoder::TimeMicros(values) => { + values.push(buf.get_long()?); } - Decoder::TimestampMillis(_, vals) => { - vals.push(buf.get_long()?); + Decoder::TimestampMillis(_, values) => { + values.push(buf.get_long()?); } - Decoder::TimestampMicros(_, vals) => { - vals.push(buf.get_long()?); + Decoder::TimestampMicros(_, values) => { + values.push(buf.get_long()?); } - Decoder::Float32(vals) => { - vals.push(buf.get_float()?); + Decoder::Float32(values) => { + values.push(buf.get_float()?); } - Decoder::Float64(vals) => { - vals.push(buf.get_double()?); + Decoder::Float64(values) => { + values.push(buf.get_double()?); } Decoder::Binary(off, data) | Decoder::String(off, data) => { let bytes = buf.get_bytes()?; off.push_length(bytes.len()); data.extend_from_slice(bytes); } + Decoder::Fixed(fsize, accum) => { + let raw = buf.get_fixed(*fsize as usize)?; + accum.extend_from_slice(raw); + } + Decoder::Interval(intervals) => { + let raw = buf.get_fixed(12)?; + let months = i32::from_le_bytes(raw[0..4].try_into().unwrap()); + let days = i32::from_le_bytes(raw[4..8].try_into().unwrap()); + let millis = i32::from_le_bytes(raw[8..12].try_into().unwrap()); + let nanos = millis as i64 * 1_000_000; + let val = IntervalMonthDayNano { + months, + days, + nanoseconds: nanos, + }; + intervals.push(val); + } Decoder::List(_, off, child) => { let total_items = read_array_blocks(buf, |b| child.decode(b))?; off.push_length(total_items); @@ -338,17 +367,15 @@ impl Decoder { c.decode(buf)?; } } - Decoder::Nullable(_, null_buf, child) => { + Decoder::Nullable(_, nulls, child) => { let branch_index = buf.get_int()?; match branch_index { 0 => { - // child - null_buf.append(true); + nulls.append(true); child.decode(buf)?; } 1 => { - // null - null_buf.append(false); + nulls.append(false); child.append_null(); } other => { @@ -388,6 +415,7 @@ impl Decoder { /// Flush buffered data into an [`ArrayRef`], optionally applying `nulls`. fn flush(&mut self, nulls: Option) -> Result { match self { + // For a nullable wrapper => flush the child with the built null buffer Decoder::Nullable(_, nb, child) => { let mask = nb.finish(); child.flush(mask) @@ -461,6 +489,32 @@ impl Decoder { let values = flush_values(data).into(); Ok(Arc::new(StringArray::new(offsets, values, nulls))) } + // Avro fixed => FixedSizeBinaryArray + Decoder::Fixed(fsize, raw) => { + let size = *fsize; + let buf: Buffer = flush_values(raw).into(); + let total_len = buf.len() / (size as usize); + let array = FixedSizeBinaryArray::try_new(size, buf, nulls) + .map_err(|e| ArrowError::ParseError(e.to_string()))?; + Ok(Arc::new(array)) + } + // Avro interval => IntervalMonthDayNanoType + Decoder::Interval(vals) => { + let data_len = vals.len(); + let mut builder = PrimitiveBuilder::::with_capacity(data_len); + for v in vals.drain(..) { + builder.append_value(v); + } + let arr = builder.finish().with_data_type(DataType::Interval(IntervalUnit::MonthDayNano)); + if let Some(nb) = nulls { + // "merge" the newly built array with the nulls + let arr_data = arr.into_data().into_builder().nulls(Some(nb)); + let arr_data = unsafe { arr_data.build_unchecked() }; + Ok(Arc::new(PrimitiveArray::::from(arr_data))) + } else { + Ok(Arc::new(arr)) + } + } // Avro array => ListArray Decoder::List(field, off, item_dec) => { let child_arr = item_dec.flush(None)?; @@ -532,10 +586,7 @@ impl Decoder { } } -/// Helper to decode an Avro array in blocks until a 0 block_count signals end. -/// -/// Each block may be negative, in which case we read an extra “block size” `long`, -/// but typically ignore it unless we want to skip. This function invokes `decode_item` once per item. +/// Decode an Avro array in blocks until a 0 block_count signals end. fn read_array_blocks( buf: &mut AvroCursor, mut decode_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, @@ -547,7 +598,7 @@ fn read_array_blocks( break; } else if block_count < 0 { let item_count = (-block_count) as usize; - let _block_size = buf.get_long()?; // read but ignore + let _block_size = buf.get_long()?; // “block size” is read but not used for _ in 0..item_count { decode_item(buf)?; } @@ -563,13 +614,10 @@ fn read_array_blocks( Ok(total_items) } -/// Helper to decode an Avro map in blocks until a 0 block_count signals end. -/// -/// For each entry in a block, we decode a key (bytes) + a value (`decode_value`). -/// Returns how many map entries were decoded. +/// Decode an Avro map in blocks until 0 block_count => end. fn read_map_blocks( buf: &mut AvroCursor, - mut decode_value: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, + mut decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, ) -> Result { let block_count = buf.get_long()?; if block_count <= 0 { @@ -577,7 +625,7 @@ fn read_map_blocks( } else { let n = block_count as usize; for _ in 0..n { - decode_value(buf)?; + decode_entry(buf)?; } Ok(n) } @@ -592,13 +640,13 @@ fn flush_primitive( PrimitiveArray::new(flush_values(values).into(), nulls) } -/// Flush an [`OffsetBufferBuilder`], returning its completed offsets. +/// Flush an [`OffsetBufferBuilder`]. #[inline] fn flush_offsets(offsets: &mut OffsetBufferBuilder) -> OffsetBuffer { std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish() } -/// Remove and return the contents of `values`, replacing it with an empty buffer. +/// Take ownership of `values`. #[inline] fn flush_values(values: &mut Vec) -> Vec { std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY)) @@ -619,39 +667,25 @@ impl DecimalBuilder { size: Option, ) -> Result { match size { - Some(s) if s > 16 && s <= 32 => { - // decimal256 - Ok(Self::Decimal256( - Decimal256Builder::new().with_precision_and_scale( - precision as u8, - scale.unwrap_or(0) as i8, - )?, - )) - } - Some(s) if s <= 16 => { - // decimal128 - Ok(Self::Decimal128( - Decimal128Builder::new().with_precision_and_scale( - precision as u8, - scale.unwrap_or(0) as i8, - )?, - )) - } + Some(s) if s > 16 && s <= 32 => Ok(Self::Decimal256( + Decimal256Builder::new() + .with_precision_and_scale(precision as u8, scale.unwrap_or(0) as i8)?, + )), + Some(s) if s <= 16 => Ok(Self::Decimal128( + Decimal128Builder::new() + .with_precision_and_scale(precision as u8, scale.unwrap_or(0) as i8)?, + )), None => { - // infer from precision when fixed size is None + // infer from precision if precision <= DECIMAL128_MAX_PRECISION as usize { Ok(Self::Decimal128( - Decimal128Builder::new().with_precision_and_scale( - precision as u8, - scale.unwrap_or(0) as i8, - )?, + Decimal128Builder::new() + .with_precision_and_scale(precision as u8, scale.unwrap_or(0) as i8)?, )) } else if precision <= DECIMAL256_MAX_PRECISION as usize { Ok(Self::Decimal256( - Decimal256Builder::new().with_precision_and_scale( - precision as u8, - scale.unwrap_or(0) as i8, - )?, + Decimal256Builder::new() + .with_precision_and_scale(precision as u8, scale.unwrap_or(0) as i8)?, )) } else { Err(ArrowError::ParseError(format!( @@ -699,7 +733,7 @@ impl DecimalBuilder { Ok(()) } - /// Finish building this decimal array, returning an [`ArrayRef`]. + /// Finish building the decimal array, returning an [`ArrayRef`]. fn finish( self, nulls: Option, @@ -779,15 +813,17 @@ mod tests { use super::*; use arrow_array::{ cast::AsArray, Array, ArrayRef, Decimal128Array, Decimal256Array, DictionaryArray, - Int32Array, ListArray, MapArray, StringArray, StructArray, + FixedSizeBinaryArray, Int32Array, IntervalMonthDayNanoArray, ListArray, MapArray, + StringArray, StructArray, }; use arrow_buffer::Buffer; use arrow_schema::{DataType as ArrowDataType, Field as ArrowField}; use serde_json::json; + use std::iter; - // ------------------- - // Zig-Zag Encoding Helper Functions - // ------------------- + // --------------- + // Zig-Zag Helpers + // --------------- fn encode_avro_int(value: i32) -> Vec { let mut buf = Vec::new(); let mut v = (value << 1) ^ (value >> 31); @@ -816,6 +852,145 @@ mod tests { buf } + // ----------------- + // Test Fixed + // ----------------- + #[test] + fn test_fixed_decoding() { + // `fixed(4)` => Arrow FixedSizeBinary(4) + let dt = AvroDataType::from_codec(Codec::Fixed(4)); + let mut dec = Decoder::try_new(&dt).unwrap(); + // 2 rows, each row => 4 bytes + let row1 = [0xDE, 0xAD, 0xBE, 0xEF]; + let row2 = [0x01, 0x23, 0x45, 0x67]; + let mut data = Vec::new(); + data.extend_from_slice(&row1); + data.extend_from_slice(&row2); + let mut cursor = AvroCursor::new(&data); + dec.decode(&mut cursor).unwrap(); + dec.decode(&mut cursor).unwrap(); + let arr = dec.flush(None).unwrap(); + let fsb = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(fsb.len(), 2); + assert_eq!(fsb.value_length(), 4); + assert_eq!(fsb.value(0), row1); + assert_eq!(fsb.value(1), row2); + } + + #[test] + fn test_fixed_with_nulls() { + // Avro union => [ fixed(2), null] + let dt = AvroDataType::from_codec(Codec::Fixed(2)); + let child = Decoder::try_new(&dt).unwrap(); + let mut dec = Decoder::Nullable( + Nullability::NullFirst, + NullBufferBuilder::new(DEFAULT_CAPACITY), + Box::new(child), + ); + // Decode 3 rows: row1 => branch=0 => [0x00], then 2 bytes + // row2 => branch=1 => null => [0x02] + // row3 => branch=0 => 2 bytes + let row1 = [0x11, 0x22]; + let row3 = [0x55, 0x66]; + let mut data = Vec::new(); + // row1 => union=0 => child => 2 bytes + data.extend_from_slice(&encode_avro_int(0)); + data.extend_from_slice(&row1); + // row2 => union=1 => null + data.extend_from_slice(&encode_avro_int(1)); + // row3 => union=0 => child => 2 bytes + data.extend_from_slice(&encode_avro_int(0)); + data.extend_from_slice(&row3); + let mut cursor = AvroCursor::new(&data); + dec.decode(&mut cursor).unwrap(); // row1 + dec.decode(&mut cursor).unwrap(); // row2 => null + dec.decode(&mut cursor).unwrap(); // row3 + let arr = dec.flush(None).unwrap(); + let fsb = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(fsb.len(), 3); + assert!(fsb.is_valid(0)); + assert!(!fsb.is_valid(1)); + assert!(fsb.is_valid(2)); + assert_eq!(fsb.value_length(), 2); + assert_eq!(fsb.value(0), row1); + assert_eq!(fsb.value(2), row3); + } + + // ----------------- + // Test Interval + // ----------------- + #[test] + fn test_interval_decoding() { + // Avro interval => 12 bytes => [ months i32, days i32, ms i32 ] + // decode 2 rows => row1 => months=1, days=2, ms=100 => row2 => months=-1, days=10, ms=9999 + let dt = AvroDataType::from_codec(Codec::Interval); + let mut dec = Decoder::try_new(&dt).unwrap(); + // row1 => months=1 => 01,00,00,00, days=2 => 02,00,00,00, ms=100 => 64,00,00,00 + // row2 => months=-1 => 0xFF,0xFF,0xFF,0xFF, days=10 => 0x0A,0x00,0x00,0x00, ms=9999 => 0x0F,0x27,0x00,0x00 + let row1 = [0x01, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, + 0x64, 0x00, 0x00, 0x00]; + let row2 = [0xFF, 0xFF, 0xFF, 0xFF, + 0x0A, 0x00, 0x00, 0x00, + 0x0F, 0x27, 0x00, 0x00]; + let mut data = Vec::new(); + data.extend_from_slice(&row1); + data.extend_from_slice(&row2); + let mut cursor = AvroCursor::new(&data); + dec.decode(&mut cursor).unwrap(); + dec.decode(&mut cursor).unwrap(); + let arr = dec.flush(None).unwrap(); + let intervals = arr + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(intervals.len(), 2); + // row0 => months=1, days=2, ms=100 => nanos=100_000_000 + // row1 => months=-1, days=10, ms=9999 => nanos=9999_000_000 + let val0 = intervals.value(0); + assert_eq!(val0.months, 1); + assert_eq!(val0.days, 2); + assert_eq!(val0.nanoseconds, 100_000_000); + let val1 = intervals.value(1); + assert_eq!(val1.months, -1); + assert_eq!(val1.days, 10); + assert_eq!(val1.nanoseconds, 9_999_000_000); + } + + #[test] + fn test_interval_decoding_with_nulls() { + // Avro union => [ interval, null] + let dt = AvroDataType::from_codec(Codec::Interval); + let child = Decoder::try_new(&dt).unwrap(); + let mut dec = Decoder::Nullable( + Nullability::NullFirst, + NullBufferBuilder::new(DEFAULT_CAPACITY), + Box::new(child), + ); + // We'll decode 2 rows: row1 => interval => months=2, days=3, ms=500 => row2 => null + // row1 => union=0 => child => 12 bytes + // row2 => union=1 => null => no data + let row1 = [0x02, 0x00, 0x00, 0x00, // months=2 + 0x03, 0x00, 0x00, 0x00, // days=3 + 0xF4, 0x01, 0x00, 0x00]; // ms=500 => nanos=500_000_000 + let mut data = Vec::new(); + data.extend_from_slice(&encode_avro_int(0)); // union=0 => child + data.extend_from_slice(&row1); + data.extend_from_slice(&encode_avro_int(1)); // union=1 => null + let mut cursor = AvroCursor::new(&data); + dec.decode(&mut cursor).unwrap(); // row1 + dec.decode(&mut cursor).unwrap(); // row2 => null + let arr = dec.flush(None).unwrap(); + let intervals = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(intervals.len(), 2); + assert!(intervals.is_valid(0)); + assert!(!intervals.is_valid(1)); + let val0 = intervals.value(0); + assert_eq!(val0.months, 2); + assert_eq!(val0.days, 3); + assert_eq!(val0.nanoseconds, 500_000_000); + } + // ------------------- // Tests for Enum // -------------------