Skip to content

Commit

Permalink
Initial write support (#122)
Browse files Browse the repository at this point in the history
* Zigzag and varint encoders

* Short repeat and direct RLEv2 writers

* Minor refactoring

* Signed msb encoder

* Refacftor RLE delta+patched base to be more functional

* Minor refactoring

* Byte RLE writer

* Remove unused error types

* Initial version of ORC writer, supporting only float

* Int8 array write support

* Integer RLEv2 Delta writing

* Minor optimization

* Abstract bits_used functionality to common NInt function

* Remove overcomplicated AbsVarint code, replace with i64/u64 in delta encoding

* Minor fixes

* Initial RLEv2 encoder base

* Remove u64 impl of NInt in favour of generic to determine sign

* Simplify getting RLE reader

* Fix percentile bit calculation encoding/decoding

* Patched base writing

* Support writing int arrays

* Multi stripe write test case

* Reduce duplication for primitive ColumnStripeEncoders

* Introduce EstimateMemory trait to standardize

* Comment

* Remove need for seek from writer + minor PR comments

* Rename s_type to kind

* Deduplicate get_closest_fixed_bits

* Fix comments

* Switch arrow writer tests to be in-memory instead of writing to disk

* Fix writing arrays with nulls

* Add license to new files
  • Loading branch information
Jefffrey authored Aug 21, 2024
1 parent 0560c10 commit 9dd640c
Show file tree
Hide file tree
Showing 27 changed files with 3,404 additions and 576 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "Apache-2.0"
description = "Implementation of Apache ORC file format using Apache Arrow in-memory format"
keywords = ["arrow", "orc", "arrow-rs", "datafusion"]
include = ["src/**/*.rs", "Cargo.toml"]
rust-version = "1.70"
rust-version = "1.73"

[dependencies]
arrow = { version = "52", features = ["prettyprint", "chrono-tz"] }
Expand Down Expand Up @@ -57,6 +57,7 @@ arrow-json = "52.0.0"
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
opendal = { version = "0.48", default-features = false, features = ["services-memory"] }
pretty_assertions = "1.3.0"
proptest = "1.0.0"
serde_json = { version = "1.0", default-features = false, features = ["std"] }

[features]
Expand Down
8 changes: 4 additions & 4 deletions src/array_decoder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ use snafu::ResultExt;
use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::column::{get_present_vec, Column};
use crate::proto::stream::Kind;
use crate::reader::decode::get_rle_reader;

use crate::error::{ArrowSnafu, Result};
use crate::reader::decode::get_unsigned_rle_reader;
use crate::stripe::Stripe;

use super::{array_decoder_factory, ArrayBatchDecoder};

pub struct ListArrayDecoder {
inner: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
field: FieldRef,
}

Expand All @@ -48,7 +48,7 @@ impl ListArrayDecoder {
let inner = array_decoder_factory(child, field.clone(), stripe)?;

let reader = stripe.stream_map().get(column, Kind::Length);
let lengths = get_rle_reader(column, reader)?;
let lengths = get_unsigned_rle_reader(column, reader);

Ok(Self {
inner,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl ArrayBatchDecoder for ListArrayDecoder {
elements_to_fetch,
"less lengths than expected in ListArray"
);
let total_length: u64 = lengths.iter().sum();
let total_length: i64 = lengths.iter().sum();
// Fetch child array as one Array with total_length elements
let child_array = self.inner.next_batch(total_length as usize, None)?;
let lengths = populate_lengths_with_nulls(lengths, batch_size, &present);
Expand Down
8 changes: 4 additions & 4 deletions src/array_decoder/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::column::{get_present_vec, Column};
use crate::error::{ArrowSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_rle_reader;
use crate::reader::decode::get_unsigned_rle_reader;
use crate::stripe::Stripe;

use super::{array_decoder_factory, ArrayBatchDecoder};
Expand All @@ -35,7 +35,7 @@ pub struct MapArrayDecoder {
keys: Box<dyn ArrayBatchDecoder>,
values: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
fields: Fields,
}

Expand All @@ -56,7 +56,7 @@ impl MapArrayDecoder {
let values = array_decoder_factory(values_column, values_field.clone(), stripe)?;

let reader = stripe.stream_map().get(column, Kind::Length);
let lengths = get_rle_reader(column, reader)?;
let lengths = get_unsigned_rle_reader(column, reader);

let fields = Fields::from(vec![keys_field, values_field]);

Expand Down Expand Up @@ -94,7 +94,7 @@ impl ArrayBatchDecoder for MapArrayDecoder {
elements_to_fetch,
"less lengths than expected in MapArray"
);
let total_length: u64 = lengths.iter().sum();
let total_length: i64 = lengths.iter().sum();
// Fetch key and value arrays, each with total_length elements
// Fetch child array as one Array with total_length elements
let keys_array = self.keys.next_batch(total_length as usize, None)?;
Expand Down
10 changes: 5 additions & 5 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, BooleanBuilder, PrimitiveArray, PrimitiveBuilder};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type, UInt64Type};
use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type};
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::datatypes::{
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
Expand All @@ -33,7 +33,7 @@ use crate::error::{
};
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;
use crate::reader::decode::byte_rle::ByteRleIter;
use crate::reader::decode::byte_rle::ByteRleReader;
use crate::reader::decode::float::FloatIter;
use crate::reader::decode::get_rle_reader;
use crate::schema::DataType;
Expand Down Expand Up @@ -119,7 +119,6 @@ impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
}
}

type UInt64ArrayDecoder = PrimitiveArrayDecoder<UInt64Type>;
type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
type Int32ArrayDecoder = PrimitiveArrayDecoder<Int32Type>;
type Int16ArrayDecoder = PrimitiveArrayDecoder<Int16Type>;
Expand Down Expand Up @@ -260,7 +259,7 @@ fn derive_present_vec(

/// Fix the lengths to account for nulls (represented as 0 length)
fn populate_lengths_with_nulls(
lengths: Vec<u64>,
lengths: Vec<i64>,
batch_size: usize,
present: &Option<Vec<bool>>,
) -> Vec<usize> {
Expand Down Expand Up @@ -365,7 +364,8 @@ pub fn array_decoder_factory(
}
);
let iter = stripe.stream_map().get(column, Kind::Data);
let iter = Box::new(ByteRleIter::new(iter).map(|value| value.map(|value| value as i8)));
let iter =
Box::new(ByteRleReader::new(iter).map(|value| value.map(|value| value as i8)));
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
Box::new(Int8ArrayDecoder::new(iter, present))
Expand Down
25 changes: 12 additions & 13 deletions src/array_decoder/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,31 @@ use crate::column::{get_present_vec, Column};
use crate::error::{ArrowSnafu, IoSnafu, Result};
use crate::proto::column_encoding::Kind as ColumnEncodingKind;
use crate::proto::stream::Kind;
use crate::reader::decode::{get_rle_reader, RleVersion};
use crate::reader::decode::get_unsigned_rle_reader;
use crate::reader::decompress::Decompressor;
use crate::stripe::Stripe;

use super::{ArrayBatchDecoder, UInt64ArrayDecoder};
use super::{ArrayBatchDecoder, Int64ArrayDecoder};

// TODO: reduce duplication with string below
pub fn new_binary_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn ArrayBatchDecoder>> {
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let lengths = stripe.stream_map().get(column, Kind::Length);
let lengths = get_rle_reader::<u64, _>(column, lengths)?;
let lengths = get_unsigned_rle_reader(column, lengths);

let bytes = Box::new(stripe.stream_map().get(column, Kind::Data));
Ok(Box::new(BinaryArrayDecoder::new(bytes, lengths, present)))
}

pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn ArrayBatchDecoder>> {
let kind = column.encoding().kind();
let rle_version = RleVersion::from(kind);
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let lengths = stripe.stream_map().get(column, Kind::Length);
let lengths = rle_version.get_unsigned_rle_reader(lengths);
let lengths = get_unsigned_rle_reader(column, lengths);

match kind {
ColumnEncodingKind::Direct | ColumnEncodingKind::DirectV2 => {
Expand All @@ -74,8 +73,8 @@ pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn Ar
let dictionary_strings = Arc::new(dictionary_strings);

let indexes = stripe.stream_map().get(column, Kind::Data);
let indexes = rle_version.get_unsigned_rle_reader(indexes);
let indexes = UInt64ArrayDecoder::new(indexes, present);
let indexes = get_unsigned_rle_reader(column, indexes);
let indexes = Int64ArrayDecoder::new(indexes, present);

Ok(Box::new(DictionaryStringArrayDecoder::new(
indexes,
Expand All @@ -91,15 +90,15 @@ pub type BinaryArrayDecoder = GenericByteArrayDecoder<GenericBinaryType<i32>>;

pub struct GenericByteArrayDecoder<T: ByteArrayType> {
bytes: Box<Decompressor>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
phantom: PhantomData<T>,
}

impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
fn new(
bytes: Box<Decompressor>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
) -> Self {
Self {
Expand Down Expand Up @@ -133,12 +132,12 @@ impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
elements_to_fetch,
"less lengths than expected in ByteArray"
);
let total_length: u64 = lengths.iter().sum();
let total_length: i64 = lengths.iter().sum();
// Fetch all data bytes at once
let mut bytes = Vec::with_capacity(total_length as usize);
self.bytes
.by_ref()
.take(total_length)
.take(total_length as u64)
.read_to_end(&mut bytes)
.context(IoSnafu)?;
let bytes = Buffer::from(bytes);
Expand All @@ -165,12 +164,12 @@ impl<T: ByteArrayType> ArrayBatchDecoder for GenericByteArrayDecoder<T> {
}

pub struct DictionaryStringArrayDecoder {
indexes: UInt64ArrayDecoder,
indexes: Int64ArrayDecoder,
dictionary: Arc<StringArray>,
}

impl DictionaryStringArrayDecoder {
fn new(indexes: UInt64ArrayDecoder, dictionary: Arc<StringArray>) -> Result<Self> {
fn new(indexes: Int64ArrayDecoder, dictionary: Arc<StringArray>) -> Result<Self> {
Ok(Self {
indexes,
dictionary,
Expand Down
4 changes: 2 additions & 2 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
column::{get_present_vec, Column},
error::{MismatchedSchemaSnafu, Result},
proto::stream::Kind,
reader::decode::{get_rle_reader, timestamp::TimestampIterator},
reader::decode::{get_rle_reader, get_unsigned_rle_reader, timestamp::TimestampIterator},
stripe::Stripe,
};
use arrow::array::ArrayRef;
Expand Down Expand Up @@ -54,7 +54,7 @@ macro_rules! decoder_for_time_unit {
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;
let secondary = get_unsigned_rle_reader(column, secondary);

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
Expand Down
4 changes: 2 additions & 2 deletions src/array_decoder/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::column::{get_present_vec, Column};
use crate::error::ArrowSnafu;
use crate::error::Result;
use crate::proto::stream::Kind;
use crate::reader::decode::byte_rle::ByteRleIter;
use crate::reader::decode::byte_rle::ByteRleReader;
use crate::stripe::Stripe;

use super::{array_decoder_factory, derive_present_vec, ArrayBatchDecoder};
Expand All @@ -47,7 +47,7 @@ impl UnionArrayDecoder {
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let tags = stripe.stream_map().get(column, Kind::Data);
let tags = Box::new(ByteRleIter::new(tags));
let tags = Box::new(ByteRleReader::new(tags));

let variants = column
.children()
Expand Down
Loading

0 comments on commit 9dd640c

Please sign in to comment.