Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for interval to parquet (#1122)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 28, 2022
1 parent 2ee4cb0 commit f57dbd5
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
- name: Run
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full -- --skip io::parquet --skip io::ipc --skip io::flight
run: cargo miri test --tests --features compute,chrono-tz

miri-checks-io:
name: MIRI on IO IPC
Expand Down
40 changes: 21 additions & 19 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {

let encoding_map = |data_type: &DataType| {
match data_type.to_physical_type() {
// let's be fancy and use delta-encoding for binary fields
PhysicalType::Binary
| PhysicalType::LargeBinary
| PhysicalType::Utf8
| PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray,
// remaining is plain
_ => Encoding::Plain,
}
Expand All @@ -70,7 +65,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
// derive the parquet schema (physical types) from arrow's schema.
let parquet_schema = to_parquet_schema(&schema)?;

let row_groups = chunks.iter().map(|batch| {
let row_groups = chunks.iter().map(|chunk| {
// write batch to pages; parallelized by rayon
let columns = chunk
.columns()
Expand Down Expand Up @@ -106,7 +101,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
});

// Create a new empty file
let file = std::fs::File::create(path)?;
let file = std::io::BufWriter::new(std::fs::File::create(path)?);

let mut writer = FileWriter::try_new(file, schema, options)?;

Expand All @@ -123,7 +118,7 @@ fn create_batch(size: usize) -> Result<Chunk> {
let c1: Int32Array = (0..size)
.map(|x| if x % 9 == 0 { None } else { Some(x as i32) })
.collect();
let c2: Utf8Array<i32> = (0..size)
let c2: Utf8Array<i64> = (0..size)
.map(|x| {
if x % 8 == 0 {
None
Expand All @@ -133,18 +128,25 @@ fn create_batch(size: usize) -> Result<Chunk> {
})
.collect();

Chunk::try_new(vec![c1.boxed(), c2.boxed()])
Chunk::try_new(vec![
c1.clone().boxed(),
c1.clone().boxed(),
c1.boxed(),
c2.boxed(),
])
}

fn main() -> Result<()> {
let schema = Schema {
fields: vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
],
metadata: Default::default(),
};
let batch = create_batch(5_000_000)?;

parallel_write("example.parquet", schema, &[batch.clone(), batch])
let fields = vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
Field::new("c4", DataType::LargeUtf8, true),
];
let batch = create_batch(100_000_000)?;

let start = std::time::SystemTime::now();
parallel_write("example.parquet", fields.into(), &[batch])?;
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
71 changes: 55 additions & 16 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
array::{Array, BinaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, Utf8Array},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{Error, Result},
types::NativeType,
types::{days_ms, NativeType},
};

use super::super::{ArrayIter, DataPages};
Expand Down Expand Up @@ -115,9 +115,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
chunk_size,
|x: i32| x as i16,
))),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32),
)),
Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
|x: i32| x as i32,
))),

Timestamp(time_unit, _) => {
let time_unit = *time_unit;
Expand All @@ -133,6 +136,50 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(

FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)),

Interval(IntervalUnit::YearMonth) => {
let n = 12;
let pages =
fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap()))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<i32>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}

Interval(IntervalUnit::DayTime) => {
let n = 12;
let pages =
fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(n)
.map(super::super::convert_days_ms)
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<days_ms>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}

Decimal(_, _) => match physical_type {
PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new(
pages,
Expand All @@ -146,14 +193,14 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
chunk_size,
|x: i64| x as i128,
))),
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
PhysicalType::FixedLenByteArray(n) if *n > 16 => {
return Err(Error::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
}
&PhysicalType::FixedLenByteArray(n) => {
let n = n as usize;
PhysicalType::FixedLenByteArray(n) => {
let n = *n;

let pages =
fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size);
Expand All @@ -163,15 +210,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
})
.map(|value: &[u8]| super::super::convert_i128(value, n))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

Expand Down
17 changes: 17 additions & 0 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,20 @@ pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader).await?)
}

fn convert_days_ms(value: &[u8]) -> crate::types::days_ms {
crate::types::days_ms(
i32::from_le_bytes(value[4..8].try_into().unwrap()),
i32::from_le_bytes(value[8..12].try_into().unwrap()),
)
}

fn convert_i128(value: &[u8], n: usize) -> i128 {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}
61 changes: 50 additions & 11 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics};

use crate::array::*;
use crate::error::Result;
use crate::types::days_ms;

fn convert(value: &[u8], n: usize) -> i128 {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}
use super::super::{convert_days_ms, convert_i128};

pub(super) fn push_i128(
from: Option<&dyn ParquetStatistics>,
Expand All @@ -29,8 +22,8 @@ pub(super) fn push_i128(
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert(x, n))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert(x, n))));
min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i128(x, n))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i128(x, n))));

Ok(())
}
Expand All @@ -53,3 +46,49 @@ pub(super) fn push(
max.push(from.and_then(|s| s.max_value.as_ref()));
Ok(())
}

fn convert_year_month(value: &[u8]) -> i32 {
i32::from_le_bytes(value[..4].try_into().unwrap())
}

pub(super) fn push_year_month(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(convert_year_month)));
max.push(from.and_then(|s| s.max_value.as_deref().map(convert_year_month)));

Ok(())
}

pub(super) fn push_days_ms(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<days_ms>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<days_ms>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(convert_days_ms)));
max.push(from.and_then(|s| s.max_value.as_deref().map(convert_days_ms)));

Ok(())
}
6 changes: 3 additions & 3 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ fn push(
Boolean => boolean::push(from, min, max),
Int8 => primitive::push(from, min, max, |x: i32| Ok(x as i8)),
Int16 => primitive::push(from, min, max, |x: i32| Ok(x as i16)),
Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => {
primitive::push(from, min, max, |x: i32| Ok(x as i32))
}
Date32 | Time32(_) => primitive::push(from, min, max, |x: i32| Ok(x as i32)),
Interval(IntervalUnit::YearMonth) => fixlen::push_year_month(from, min, max),
Interval(IntervalUnit::DayTime) => fixlen::push_days_ms(from, min, max),
UInt8 => primitive::push(from, min, max, |x: i32| Ok(x as u8)),
UInt16 => primitive::push(from, min, max, |x: i32| Ok(x as u16)),
UInt32 => match physical_type {
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ pub fn array_to_page_simple(
array.values().iter().for_each(|x| {
let bytes = &x.to_le_bytes();
values.extend_from_slice(bytes);
values.resize(values.len() + 8, 0);
values.extend_from_slice(&[0; 8]);
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(12),
Expand All @@ -283,7 +283,7 @@ pub fn array_to_page_simple(
let mut values = Vec::<u8>::with_capacity(12 * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_le_bytes();
values.resize(values.len() + 4, 0); // months
values.extend_from_slice(&[0; 4]); // months
values.extend_from_slice(bytes); // days and seconds
});
let array = FixedSizeBinaryArray::from_data(
Expand Down
Loading

0 comments on commit f57dbd5

Please sign in to comment.