From 05133e18fbbdf46ecf484a165edd842f5f6e78a3 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 28 Jun 2022 17:25:17 +0000 Subject: [PATCH] Added support for interval to parquet --- examples/parquet_write_parallel/src/main.rs | 40 ++++++------ src/io/parquet/read/deserialize/simple.rs | 71 ++++++++++++++++----- src/io/parquet/read/mod.rs | 17 +++++ src/io/parquet/read/statistics/fixlen.rs | 61 ++++++++++++++---- src/io/parquet/read/statistics/mod.rs | 6 +- src/io/parquet/write/mod.rs | 4 +- tests/it/io/parquet/mod.rs | 36 ++++++++--- 7 files changed, 175 insertions(+), 60 deletions(-) diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 980c817e422..4973d0e7a77 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -51,11 +51,6 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> { let encoding_map = |data_type: &DataType| { match data_type.to_physical_type() { - // let's be fancy and use delta-encoding for binary fields - PhysicalType::Binary - | PhysicalType::LargeBinary - | PhysicalType::Utf8 - | PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray, // remaining is plain _ => Encoding::Plain, } @@ -70,7 +65,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> { // derive the parquet schema (physical types) from arrow's schema. let parquet_schema = to_parquet_schema(&schema)?; - let row_groups = chunks.iter().map(|batch| { + let row_groups = chunks.iter().map(|chunk| { // write batch to pages; parallelized by rayon let columns = chunk .columns() @@ -106,7 +101,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> { }); // Create a new empty file - let file = std::fs::File::create(path)?; + let file = std::io::BufWriter::new(std::fs::File::create(path)?); let mut writer = FileWriter::try_new(file, schema, options)?; @@ -123,7 +118,7 @@ fn create_batch(size: usize) -> Result { let c1: Int32Array = (0..size) .map(|x| if x % 9 == 0 { None } else { Some(x as i32) }) .collect(); - let c2: Utf8Array = (0..size) + let c2: Utf8Array = (0..size) .map(|x| { if x % 8 == 0 { None @@ -133,18 +128,25 @@ fn create_batch(size: usize) -> Result { }) .collect(); - Chunk::try_new(vec![c1.boxed(), c2.boxed()]) + Chunk::try_new(vec![ + c1.clone().boxed(), + c1.clone().boxed(), + c1.boxed(), + c2.boxed(), + ]) } fn main() -> Result<()> { - let schema = Schema { - fields: vec![ - Field::new("c1", DataType::Int32, true), - Field::new("c2", DataType::Utf8, true), - ], - metadata: Default::default(), - }; - let batch = create_batch(5_000_000)?; - - parallel_write("example.parquet", schema, &[batch.clone(), batch]) + let fields = vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + Field::new("c4", DataType::LargeUtf8, true), + ]; + let batch = create_batch(100_000_000)?; + + let start = std::time::SystemTime::now(); + parallel_write("example.parquet", fields.into(), &[batch])?; + println!("took: {} ms", start.elapsed().unwrap().as_millis()); + Ok(()) } diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 20dba380653..e9257d045ee 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -9,7 +9,7 @@ use crate::{ array::{Array, BinaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, Utf8Array}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{Error, Result}, - types::NativeType, + types::{days_ms, NativeType}, }; use super::super::{ArrayIter, DataPages}; @@ -115,9 +115,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( chunk_size, |x: i32| x as i16, ))), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden( - primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32), - )), + Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i32| x as i32, + ))), Timestamp(time_unit, _) => { let time_unit = *time_unit; @@ -133,6 +136,50 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)), + Interval(IntervalUnit::YearMonth) => { + let n = 12; + let pages = + fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap())) + .collect::>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ + } + + Interval(IntervalUnit::DayTime) => { + let n = 12; + let pages = + fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(super::super::convert_days_ms) + .collect::>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ + } + Decimal(_, _) => match physical_type { PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( pages, @@ -146,14 +193,14 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( chunk_size, |x: i64| x as i128, ))), - &PhysicalType::FixedLenByteArray(n) if n > 16 => { + PhysicalType::FixedLenByteArray(n) if *n > 16 => { return Err(Error::NotYetImplemented(format!( "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", n ))) } - &PhysicalType::FixedLenByteArray(n) => { - let n = n as usize; + PhysicalType::FixedLenByteArray(n) => { + let n = *n; let pages = fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); @@ -163,15 +210,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( let values = array .values() .chunks_exact(n) - .map(|value: &[u8]| { - // Copy the fixed-size byte value to the start of a 16 byte stack - // allocated buffer, then use an arithmetic right shift to fill in - // MSBs, which accounts for leading 1's in negative (two's complement) - // values. - let mut bytes = [0u8; 16]; - bytes[..n].copy_from_slice(value); - i128::from_be_bytes(bytes) >> (8 * (16 - n)) - }) + .map(|value: &[u8]| super::super::convert_i128(value, n)) .collect::>(); let validity = array.validity().cloned(); diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 4451c4d77b7..b7609602e03 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -66,3 +66,20 @@ pub async fn read_metadata_async( ) -> Result { Ok(_read_metadata_async(reader).await?) } + +fn convert_days_ms(value: &[u8]) -> crate::types::days_ms { + crate::types::days_ms( + i32::from_le_bytes(value[4..8].try_into().unwrap()), + i32::from_le_bytes(value[8..12].try_into().unwrap()), + ) +} + +fn convert_i128(value: &[u8], n: usize) -> i128 { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(value); + i128::from_be_bytes(bytes) >> (8 * (16 - n)) +} diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index b97f6458643..f90ca9f4f34 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -2,16 +2,9 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; use crate::array::*; use crate::error::Result; +use crate::types::days_ms; -fn convert(value: &[u8], n: usize) -> i128 { - // Copy the fixed-size byte value to the start of a 16 byte stack - // allocated buffer, then use an arithmetic right shift to fill in - // MSBs, which accounts for leading 1's in negative (two's complement) - // values. - let mut bytes = [0u8; 16]; - bytes[..n].copy_from_slice(value); - i128::from_be_bytes(bytes) >> (8 * (16 - n)) -} +use super::super::{convert_days_ms, convert_i128}; pub(super) fn push_i128( from: Option<&dyn ParquetStatistics>, @@ -29,8 +22,8 @@ pub(super) fn push_i128( .unwrap(); let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); - min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert(x, n)))); - max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert(x, n)))); + min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i128(x, n)))); + max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i128(x, n)))); Ok(()) } @@ -53,3 +46,49 @@ pub(super) fn push( max.push(from.and_then(|s| s.max_value.as_ref())); Ok(()) } + +fn convert_year_month(value: &[u8]) -> i32 { + i32::from_le_bytes(value[..4].try_into().unwrap()) +} + +pub(super) fn push_year_month( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push(from.and_then(|s| s.min_value.as_deref().map(convert_year_month))); + max.push(from.and_then(|s| s.max_value.as_deref().map(convert_year_month))); + + Ok(()) +} + +pub(super) fn push_days_ms( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push(from.and_then(|s| s.min_value.as_deref().map(convert_days_ms))); + max.push(from.and_then(|s| s.max_value.as_deref().map(convert_days_ms))); + + Ok(()) +} diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 540dac4df97..a5916a98792 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -405,9 +405,9 @@ fn push( Boolean => boolean::push(from, min, max), Int8 => primitive::push(from, min, max, |x: i32| Ok(x as i8)), Int16 => primitive::push(from, min, max, |x: i32| Ok(x as i16)), - Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { - primitive::push(from, min, max, |x: i32| Ok(x as i32)) - } + Date32 | Time32(_) => primitive::push(from, min, max, |x: i32| Ok(x as i32)), + Interval(IntervalUnit::YearMonth) => fixlen::push_year_month(from, min, max), + Interval(IntervalUnit::DayTime) => fixlen::push_days_ms(from, min, max), UInt8 => primitive::push(from, min, max, |x: i32| Ok(x as u8)), UInt16 => primitive::push(from, min, max, |x: i32| Ok(x as u16)), UInt32 => match physical_type { diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 5dfd8ae28b7..4fea3d9c4aa 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -260,7 +260,7 @@ pub fn array_to_page_simple( array.values().iter().for_each(|x| { let bytes = &x.to_le_bytes(); values.extend_from_slice(bytes); - values.resize(values.len() + 8, 0); + values.extend_from_slice(&[0; 8]); }); let array = FixedSizeBinaryArray::new( DataType::FixedSizeBinary(12), @@ -283,7 +283,7 @@ pub fn array_to_page_simple( let mut values = Vec::::with_capacity(12 * array.len()); array.values().iter().for_each(|x| { let bytes = &x.to_le_bytes(); - values.resize(values.len() + 4, 0); // months + values.extend_from_slice(&[0; 4]); // months values.extend_from_slice(bytes); // days and seconds }); let array = FixedSizeBinaryArray::from_data( diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 08f65e3a67c..f43a1736e41 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1,9 +1,16 @@ use std::io::{Cursor, Read, Seek}; use arrow2::{ - array::*, bitmap::Bitmap, buffer::Buffer, chunk::Chunk, datatypes::*, error::Result, - io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*, - types::NativeType, + array::*, + bitmap::Bitmap, + buffer::Buffer, + chunk::Chunk, + datatypes::*, + error::Result, + io::parquet::read::statistics::*, + io::parquet::read::*, + io::parquet::write::*, + types::{days_ms, NativeType}, }; #[cfg(feature = "io_json_integration")] @@ -909,7 +916,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { } } -fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Result> { +fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Result> { let options = WriteOptions { write_statistics: true, compression: CompressionOptions::Uncompressed, @@ -929,7 +936,7 @@ fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Resu .collect(); let row_groups = - RowGroupIterator::try_new(batches.iter().cloned().map(Ok), schema, options, encodings)?; + RowGroupIterator::try_new(chunks.iter().cloned().map(Ok), schema, options, encodings)?; let writer = Cursor::new(vec![]); @@ -1005,6 +1012,13 @@ fn arrow_type() -> Result<()> { let values = PrimitiveArray::from_slice([1u64, 3]); let array12 = DictionaryArray::from_data(indices, Box::new(values)); + let array13 = PrimitiveArray::::from_slice([1, 2, 3]) + .to(DataType::Interval(IntervalUnit::YearMonth)); + + let array14 = + PrimitiveArray::::from_slice([days_ms(1, 1), days_ms(2, 2), days_ms(3, 3)]) + .to(DataType::Interval(IntervalUnit::DayTime)); + let schema = Schema::from(vec![ Field::new("a1", array1.data_type().clone(), true), Field::new("a2", array2.data_type().clone(), true), @@ -1019,8 +1033,10 @@ fn arrow_type() -> Result<()> { Field::new("a10", array10.data_type().clone(), true), Field::new("a11", array11.data_type().clone(), true), Field::new("a12", array12.data_type().clone(), true), + Field::new("a13", array13.data_type().clone(), true), + Field::new("a14", array14.data_type().clone(), true), ]); - let batch = Chunk::try_new(vec![ + let chunk = Chunk::try_new(vec![ array1.boxed(), array2.boxed(), array3.boxed(), @@ -1034,14 +1050,16 @@ fn arrow_type() -> Result<()> { array10.boxed(), array11.boxed(), array12.boxed(), + array13.boxed(), + array14.boxed(), ])?; - let r = integration_write(&schema, &[batch.clone()])?; + let r = integration_write(&schema, &[chunk.clone()])?; - let (new_schema, new_batches) = integration_read(&r)?; + let (new_schema, new_chunks) = integration_read(&r)?; assert_eq!(new_schema, schema); - assert_eq!(new_batches, vec![batch]); + assert_eq!(new_chunks, vec![chunk]); Ok(()) }