Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial write support #122

Merged
merged 35 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c8e8f97
Zigzag and varint encoders
Jefffrey May 25, 2024
43de08c
Short repeat and direct RLEv2 writers
Jefffrey May 25, 2024
487efe9
Minor refactoring
Jefffrey May 25, 2024
8c7bfb2
Signed msb encoder
Jefffrey May 25, 2024
27ac0a9
Refacftor RLE delta+patched base to be more functional
Jefffrey May 25, 2024
aebbdf8
Minor refactoring
Jefffrey May 25, 2024
a2d98c3
Byte RLE writer
Jefffrey May 26, 2024
633d015
Remove unused error types
Jefffrey May 30, 2024
55739d5
Initial version of ORC writer, supporting only float
Jefffrey May 30, 2024
33c5273
Int8 array write support
Jefffrey May 31, 2024
787cdac
Integer RLEv2 Delta writing
Jefffrey Jun 1, 2024
9961f34
Minor optimization
Jefffrey Jun 1, 2024
05e9ee2
Abstract bits_used functionality to common NInt function
Jefffrey Jun 2, 2024
3cc2c9f
Remove overcomplicated AbsVarint code, replace with i64/u64 in delta …
Jefffrey Jun 3, 2024
a2826e5
Merge branch 'main' into initial-write-support
Jefffrey Jun 5, 2024
f1dcdfe
Minor fixes
Jefffrey Jun 5, 2024
210a04f
Initial RLEv2 encoder base
Jefffrey Jun 7, 2024
84fc965
Remove u64 impl of NInt in favour of generic to determine sign
Jefffrey Jun 8, 2024
f9ec0db
Simplify getting RLE reader
Jefffrey Jun 8, 2024
69d50f1
Fix percentile bit calculation encoding/decoding
Jefffrey Jul 2, 2024
fd7542b
Merge branch 'main' into initial-write-support
Jefffrey Aug 18, 2024
38cd2ba
Patched base writing
Jefffrey Aug 18, 2024
4f3ec1b
Support writing int arrays
Jefffrey Aug 18, 2024
b54ce9a
Multi stripe write test case
Jefffrey Aug 18, 2024
64ef1ee
Merge branch 'main' into initial-write-support
Jefffrey Aug 19, 2024
ef6b72f
Reduce duplication for primitive ColumnStripeEncoders
Jefffrey Aug 19, 2024
a2bed4e
Introduce EstimateMemory trait to standardize
Jefffrey Aug 19, 2024
5afcfe2
Comment
Jefffrey Aug 19, 2024
e663d8f
Remove need for seek from writer + minor PR comments
Jefffrey Aug 19, 2024
7c6f414
Rename s_type to kind
Jefffrey Aug 19, 2024
fc907cc
Deduplicate get_closest_fixed_bits
Jefffrey Aug 20, 2024
3d8bf26
Fix comments
Jefffrey Aug 20, 2024
3292f4f
Switch arrow writer tests to be in-memory instead of writing to disk
Jefffrey Aug 20, 2024
1703d87
Fix writing arrays with nulls
Jefffrey Aug 20, 2024
2d6e227
Add license to new files
Jefffrey Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "Apache-2.0"
description = "Implementation of Apache ORC file format using Apache Arrow in-memory format"
keywords = ["arrow", "orc", "arrow-rs", "datafusion"]
include = ["src/**/*.rs", "Cargo.toml"]
rust-version = "1.70"
rust-version = "1.73"

[dependencies]
arrow = { version = "52", features = ["prettyprint", "chrono-tz"] }
Expand Down Expand Up @@ -57,6 +57,7 @@ arrow-json = "52.0.0"
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
opendal = { version = "0.48", default-features = false, features = ["services-memory"] }
pretty_assertions = "1.3.0"
proptest = "1.0.0"
serde_json = { version = "1.0", default-features = false, features = ["std"] }

[features]
Expand Down
8 changes: 4 additions & 4 deletions src/array_decoder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ use snafu::ResultExt;
use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::column::{get_present_vec, Column};
use crate::proto::stream::Kind;
use crate::reader::decode::get_rle_reader;

use crate::error::{ArrowSnafu, Result};
use crate::reader::decode::get_unsigned_rle_reader;
use crate::stripe::Stripe;

use super::{array_decoder_factory, ArrayBatchDecoder};

pub struct ListArrayDecoder {
inner: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
field: FieldRef,
}

Expand All @@ -48,7 +48,7 @@ impl ListArrayDecoder {
let inner = array_decoder_factory(child, field.clone(), stripe)?;

let reader = stripe.stream_map().get(column, Kind::Length);
let lengths = get_rle_reader(column, reader)?;
let lengths = get_unsigned_rle_reader(column, reader);

Ok(Self {
inner,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl ArrayBatchDecoder for ListArrayDecoder {
elements_to_fetch,
"less lengths than expected in ListArray"
);
let total_length: u64 = lengths.iter().sum();
let total_length: i64 = lengths.iter().sum();
// Fetch child array as one Array with total_length elements
let child_array = self.inner.next_batch(total_length as usize, None)?;
let lengths = populate_lengths_with_nulls(lengths, batch_size, &present);
Expand Down
8 changes: 4 additions & 4 deletions src/array_decoder/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::column::{get_present_vec, Column};
use crate::error::{ArrowSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_rle_reader;
use crate::reader::decode::get_unsigned_rle_reader;
use crate::stripe::Stripe;

use super::{array_decoder_factory, ArrayBatchDecoder};
Expand All @@ -35,7 +35,7 @@ pub struct MapArrayDecoder {
keys: Box<dyn ArrayBatchDecoder>,
values: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
fields: Fields,
}

Expand All @@ -56,7 +56,7 @@ impl MapArrayDecoder {
let values = array_decoder_factory(values_column, values_field.clone(), stripe)?;

let reader = stripe.stream_map().get(column, Kind::Length);
let lengths = get_rle_reader(column, reader)?;
let lengths = get_unsigned_rle_reader(column, reader);

let fields = Fields::from(vec![keys_field, values_field]);

Expand Down Expand Up @@ -94,7 +94,7 @@ impl ArrayBatchDecoder for MapArrayDecoder {
elements_to_fetch,
"less lengths than expected in MapArray"
);
let total_length: u64 = lengths.iter().sum();
let total_length: i64 = lengths.iter().sum();
// Fetch key and value arrays, each with total_length elements
// Fetch child array as one Array with total_length elements
let keys_array = self.keys.next_batch(total_length as usize, None)?;
Expand Down
10 changes: 5 additions & 5 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, BooleanBuilder, PrimitiveArray, PrimitiveBuilder};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type, UInt64Type};
use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type};
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::datatypes::{
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
Expand All @@ -33,7 +33,7 @@ use crate::error::{
};
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;
use crate::reader::decode::byte_rle::ByteRleIter;
use crate::reader::decode::byte_rle::ByteRleReader;
use crate::reader::decode::float::FloatIter;
use crate::reader::decode::get_rle_reader;
use crate::schema::DataType;
Expand Down Expand Up @@ -119,7 +119,6 @@ impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
}
}

type UInt64ArrayDecoder = PrimitiveArrayDecoder<UInt64Type>;
type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
type Int32ArrayDecoder = PrimitiveArrayDecoder<Int32Type>;
type Int16ArrayDecoder = PrimitiveArrayDecoder<Int16Type>;
Expand Down Expand Up @@ -260,7 +259,7 @@ fn derive_present_vec(

/// Fix the lengths to account for nulls (represented as 0 length)
fn populate_lengths_with_nulls(
lengths: Vec<u64>,
lengths: Vec<i64>,
batch_size: usize,
present: &Option<Vec<bool>>,
) -> Vec<usize> {
Expand Down Expand Up @@ -365,7 +364,8 @@ pub fn array_decoder_factory(
}
);
let iter = stripe.stream_map().get(column, Kind::Data);
let iter = Box::new(ByteRleIter::new(iter).map(|value| value.map(|value| value as i8)));
let iter =
Box::new(ByteRleReader::new(iter).map(|value| value.map(|value| value as i8)));
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
Box::new(Int8ArrayDecoder::new(iter, present))
Expand Down
25 changes: 12 additions & 13 deletions src/array_decoder/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,31 @@ use crate::column::{get_present_vec, Column};
use crate::error::{ArrowSnafu, IoSnafu, Result};
use crate::proto::column_encoding::Kind as ColumnEncodingKind;
use crate::proto::stream::Kind;
use crate::reader::decode::{get_rle_reader, RleVersion};
use crate::reader::decode::get_unsigned_rle_reader;
use crate::reader::decompress::Decompressor;
use crate::stripe::Stripe;

use super::{ArrayBatchDecoder, UInt64ArrayDecoder};
use super::{ArrayBatchDecoder, Int64ArrayDecoder};

// TODO: reduce duplication with string below
pub fn new_binary_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn ArrayBatchDecoder>> {
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let lengths = stripe.stream_map().get(column, Kind::Length);
let lengths = get_rle_reader::<u64, _>(column, lengths)?;
let lengths = get_unsigned_rle_reader(column, lengths);

let bytes = Box::new(stripe.stream_map().get(column, Kind::Data));
Ok(Box::new(BinaryArrayDecoder::new(bytes, lengths, present)))
}

pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn ArrayBatchDecoder>> {
let kind = column.encoding().kind();
let rle_version = RleVersion::from(kind);
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let lengths = stripe.stream_map().get(column, Kind::Length);
let lengths = rle_version.get_unsigned_rle_reader(lengths);
let lengths = get_unsigned_rle_reader(column, lengths);

match kind {
ColumnEncodingKind::Direct | ColumnEncodingKind::DirectV2 => {
Expand All @@ -74,8 +73,8 @@ pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn Ar
let dictionary_strings = Arc::new(dictionary_strings);

let indexes = stripe.stream_map().get(column, Kind::Data);
let indexes = rle_version.get_unsigned_rle_reader(indexes);
let indexes = UInt64ArrayDecoder::new(indexes, present);
let indexes = get_unsigned_rle_reader(column, indexes);
let indexes = Int64ArrayDecoder::new(indexes, present);

Ok(Box::new(DictionaryStringArrayDecoder::new(
indexes,
Expand All @@ -91,15 +90,15 @@ pub type BinaryArrayDecoder = GenericByteArrayDecoder<GenericBinaryType<i32>>;

pub struct GenericByteArrayDecoder<T: ByteArrayType> {
bytes: Box<Decompressor>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
phantom: PhantomData<T>,
}

impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
fn new(
bytes: Box<Decompressor>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
) -> Self {
Self {
Expand Down Expand Up @@ -133,12 +132,12 @@ impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
elements_to_fetch,
"less lengths than expected in ByteArray"
);
let total_length: u64 = lengths.iter().sum();
let total_length: i64 = lengths.iter().sum();
// Fetch all data bytes at once
let mut bytes = Vec::with_capacity(total_length as usize);
self.bytes
.by_ref()
.take(total_length)
.take(total_length as u64)
.read_to_end(&mut bytes)
.context(IoSnafu)?;
let bytes = Buffer::from(bytes);
Expand All @@ -165,12 +164,12 @@ impl<T: ByteArrayType> ArrayBatchDecoder for GenericByteArrayDecoder<T> {
}

pub struct DictionaryStringArrayDecoder {
indexes: UInt64ArrayDecoder,
indexes: Int64ArrayDecoder,
dictionary: Arc<StringArray>,
}

impl DictionaryStringArrayDecoder {
fn new(indexes: UInt64ArrayDecoder, dictionary: Arc<StringArray>) -> Result<Self> {
fn new(indexes: Int64ArrayDecoder, dictionary: Arc<StringArray>) -> Result<Self> {
Ok(Self {
indexes,
dictionary,
Expand Down
4 changes: 2 additions & 2 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
column::{get_present_vec, Column},
error::{MismatchedSchemaSnafu, Result},
proto::stream::Kind,
reader::decode::{get_rle_reader, timestamp::TimestampIterator},
reader::decode::{get_rle_reader, get_unsigned_rle_reader, timestamp::TimestampIterator},
stripe::Stripe,
};
use arrow::array::ArrayRef;
Expand Down Expand Up @@ -54,7 +54,7 @@ macro_rules! decoder_for_time_unit {
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;
let secondary = get_unsigned_rle_reader(column, secondary);

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
Expand Down
4 changes: 2 additions & 2 deletions src/array_decoder/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::column::{get_present_vec, Column};
use crate::error::ArrowSnafu;
use crate::error::Result;
use crate::proto::stream::Kind;
use crate::reader::decode::byte_rle::ByteRleIter;
use crate::reader::decode::byte_rle::ByteRleReader;
use crate::stripe::Stripe;

use super::{array_decoder_factory, derive_present_vec, ArrayBatchDecoder};
Expand All @@ -47,7 +47,7 @@ impl UnionArrayDecoder {
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let tags = stripe.stream_map().get(column, Kind::Data);
let tags = Box::new(ByteRleIter::new(tags));
let tags = Box::new(ByteRleReader::new(tags));

let variants = column
.children()
Expand Down
Loading
Loading