Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial write support #122

Merged
merged 35 commits into from
Aug 21, 2024
Merged

Initial write support #122

merged 35 commits into from
Aug 21, 2024

Conversation

Jefffrey
Copy link
Collaborator

@Jefffrey Jefffrey commented Aug 19, 2024

Support for writing the following Arrow array types to an ORC file:

  • Int8/Int16/Int32/Int64
  • Float32/Float64

Broadly speaking, this writer will delegate to internal types to encode the actual primitive values into a run encoded form (RLEv2 for Int, Byte RLE for bytes, simple copy for floats) into an in-memory buffer until the stripe size exceeds a configured limit in which case the stripe is flushed to the actual writer (e.g. to disk) and the internal buffers cleared.

  • Not much focus on optimization, so the internal buffers aren't being reusued, this will be addressed later

Overall architecture

src/arrow_writer.rs contains the public interface for this writer, where an ArrowWriterBuilder is used to specify config options to build an ArrowWriter which allows writing a batch at a time, see example usage:

let f = File::create("/tmp/new.orc").unwrap();
let mut writer = ArrowWriterBuilder::new(f, batch.schema())
.try_build()
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

This writer will automatically flush a stripe when it is estimated to have exceeded the configured stripe size (default 64mb). Internally, this writer has a StripeWriter to handle encoding the actual stripe in-memory:

/// Encode a stripe. Will encode columns into an in-memory buffer before flushing
/// entire stripe to the underlying writer.
pub struct StripeWriter<W> {
writer: W,
/// Flattened columns, in order of their column ID.
columns: Vec<Box<dyn ColumnStripeEncoder>>,
pub row_count: usize,
}

This StripeWriter has an array of ColumnStripeEncoders which are responsible for the encoding of individual columns within the recordbatch/array. Note the flat structure, this will need to be revisited when accounting for nested types (map, struct, etc.).

/// Encodes a specific column for a stripe. Will encode to an internal memory
/// buffer until it is finished, in which case it returns the stream bytes to
/// be serialized to a writer.
pub trait ColumnStripeEncoder: EstimateMemory {
/// Encode entire provided [`ArrayRef`] to internal buffer.
fn encode_array(&mut self, array: &ArrayRef) -> Result<()>;
/// Column encoding used for streams.
fn column_encoding(&self) -> ColumnEncoding;
/// Emit buffered streams to be written to the writer, and reset state
/// in preparation for next stripe.
fn finish(&mut self) -> Vec<Stream>;
}

Currently only one implementation for ColumnStripeEncoder which is PrimitiveStripeEncoder:

/// Encoder for primitive ORC types (e.g. int, float). Uses a specific [`PrimitiveValueEncoder`] to
/// encode the primitive values into internal memory. When finished, outputs a DATA stream and
/// optionally a PRESENT stream.
pub struct PrimitiveStripeEncoder<T: ArrowPrimitiveType, E: PrimitiveValueEncoder<T::Native>> {
encoder: E,
column_encoding: ColumnEncoding,
/// Lazily initialized once we encounter an [`Array`] with a [`NullBuffer`].
present: Option<PresentStreamEncoder>,
encoded_count: usize,
_phantom: PhantomData<T>,
}

This handles encoding the actual primitive values themselves, which is then handled by the PrimitiveValueEncoder trait, and also optionally handles encoding a present stream.

/// Encodes primitive values into an internal buffer, usually with a specialized run length
/// encoding for better compression.
pub trait PrimitiveValueEncoder<V>: EstimateMemory
where
V: Copy,
{
fn new() -> Self;
fn write_one(&mut self, value: V);
fn write_slice(&mut self, values: &[V]) {
for &value in values {
self.write_one(value);
}
}
/// Take the encoded bytes, replacing it with an empty buffer.
// TODO: Figure out how to retain the allocation instead of handing
// it off each time.
fn take_inner(&mut self) -> Bytes;
}

Encoding

There are currently three implementations.

Float has the simplest as ORC specifies no special encoding for it, it simply copies. Refer to FloatValueEncoder:

/// No special run encoding for floats/doubles, they are stored as their IEEE 754 floating
/// point bit layout. This encoder simply copies incoming floats/doubles to its internal
/// byte buffer.
pub struct FloatValueEncoder<T: ArrowPrimitiveType>
where
T::Native: Float,
{
data: BytesMut,
_phantom: PhantomData<T>,
}

Bytes have some simple logic for run length encoding, see ByteRleWriter:

pub struct ByteRleWriter {
writer: BytesMut,
/// Literal values to encode.
literals: [u8; MAX_LITERAL_LENGTH],
/// Represents the number of elements currently in `literals` if Literals,
/// otherwise represents the length of the Run.
num_literals: usize,
/// Tracks if current Literal sequence will turn into a Run sequence due to
/// repeated values at the end of the value sequence.
tail_run_length: usize,
/// If in Run sequence or not, and keeps the corresponding value.
run_value: Option<u8>,
}

Integers have the most complicated version due to the complex sub0encoding types (as well as the actual logic to determine which sub0encoding to use), see RleWriterV2:

pub struct RleWriterV2<N: NInt, S: EncodingSign> {
/// Stores the run length encoded sequences.
data: BytesMut,
/// Used in state machine for determining which sub-encoding
/// for a sequence to use.
state: RleV2EncodingState<N>,
phantom: PhantomData<S>,
}

  • Write support for V1 encoding seems pointless, so not implemented

For both Byte and Int RLEv2 encodings, the logic has been translated from the Java implementation of ORC in order to capture details the spec doesn't specify as well as use the logic it uses to determine which sub-encodings to use. This logic was refactored to make the logic easier to understand (at least to me).

Other changes

  • Bumped rust version to 1.73 (to be able to use div_ceil)
  • Introduced proptests, very handy for testing this serde behaviour
  • Removed usage of u64 in integer run length encoding code, as this is incorrect since the Java implementation doesn't support unsigned integers (everything is an i64 there)
    • Though the ORC spec states that some types like string use unsigned integer encoding, this is still going to be an i64 so stick with that
  • Rename ByteRleIter to ByteRleReader for consistency with other reader interfaces (such as RLEv2)

@Jefffrey Jefffrey marked this pull request as ready for review August 19, 2024 10:50
@Jefffrey
Copy link
Collaborator Author

Writing string/boolean arrays will be done after this since the PR is already quite large

@Jefffrey
Copy link
Collaborator Author

Sorry for such a large PR 😅

I wanted an end to end example, and felt it wouldn't be proper without at least integer support. I didn't want to waste time implementing RLEv1 writing, so the majority of time was spent on trying to understand the RLEv2 write logic in the Java implementation and translate it to code that I think might be easier to understand (I didn't want to just do a one-to-one port from the C++/Java implementation to here).

Pinging for awareness @WenyXu @Xuanwo @waynexia

Copy link
Collaborator

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for this PR; it's really extensive! This review primarily focuses on the API related to IO operations.

src/array_decoder/string.rs Outdated Show resolved Hide resolved
src/arrow_writer.rs Outdated Show resolved Hide resolved
writer: W,
schema: SchemaRef,
batch_size: usize,
stripe_byte_size: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think using stripe_size or stripe_length for short?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like keeping the unit as part of the name, to be unambiguous

src/arrow_writer.rs Outdated Show resolved Hide resolved
src/writer/stripe.rs Show resolved Hide resolved
src/writer/stripe.rs Outdated Show resolved Hide resolved
src/writer/stripe.rs Show resolved Hide resolved
src/writer/stripe.rs Outdated Show resolved Hide resolved
src/writer/stripe.rs Outdated Show resolved Hide resolved
src/writer/stripe.rs Outdated Show resolved Hide resolved
src/writer/column.rs Show resolved Hide resolved
src/writer/column.rs Show resolved Hide resolved
Copy link
Collaborator

@WenyXu WenyXu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

@Jefffrey Jefffrey merged commit 9dd640c into main Aug 21, 2024
9 checks passed
@Jefffrey Jefffrey deleted the initial-write-support branch August 21, 2024 11:19
waynexia pushed a commit that referenced this pull request Oct 24, 2024
* Zigzag and varint encoders

* Short repeat and direct RLEv2 writers

* Minor refactoring

* Signed msb encoder

* Refacftor RLE delta+patched base to be more functional

* Minor refactoring

* Byte RLE writer

* Remove unused error types

* Initial version of ORC writer, supporting only float

* Int8 array write support

* Integer RLEv2 Delta writing

* Minor optimization

* Abstract bits_used functionality to common NInt function

* Remove overcomplicated AbsVarint code, replace with i64/u64 in delta encoding

* Minor fixes

* Initial RLEv2 encoder base

* Remove u64 impl of NInt in favour of generic to determine sign

* Simplify getting RLE reader

* Fix percentile bit calculation encoding/decoding

* Patched base writing

* Support writing int arrays

* Multi stripe write test case

* Reduce duplication for primitive ColumnStripeEncoders

* Introduce EstimateMemory trait to standardize

* Comment

* Remove need for seek from writer + minor PR comments

* Rename s_type to kind

* Deduplicate get_closest_fixed_bits

* Fix comments

* Switch arrow writer tests to be in-memory instead of writing to disk

* Fix writing arrays with nulls

* Add license to new files
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants