Skip to content

Commit

Permalink
Consolidate common RLE decoding logic into new GenericRle trait
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Oct 9, 2024
1 parent 22cb4c5 commit be8a685
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 114 deletions.
33 changes: 17 additions & 16 deletions src/encoding/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use bytemuck::must_cast_slice;
use bytes::{BufMut, BytesMut};
use snafu::ResultExt;

Expand All @@ -24,7 +25,7 @@ use crate::{
};
use std::io::Read;

use super::{util::read_u8, PrimitiveValueDecoder, PrimitiveValueEncoder};
use super::{rle::GenericRle, util::read_u8, PrimitiveValueEncoder};

const MAX_LITERAL_LENGTH: usize = 128;
const MIN_REPEAT_LENGTH: usize = 3;
Expand Down Expand Up @@ -209,10 +210,22 @@ impl<R: Read> ByteRleDecoder<R> {
index: 0,
}
}
}

impl<R: Read> GenericRle<i8> for ByteRleDecoder<R> {
fn advance(&mut self, n: usize) {
self.index += n
}

fn read_values(&mut self) -> Result<()> {
fn available(&self) -> &[i8] {
let bytes = &self.leftovers[self.index..];
must_cast_slice(bytes)
}

fn decode_batch(&mut self) -> Result<()> {
self.index = 0;
self.leftovers.clear();

let header = read_u8(&mut self.reader)?;
if header < 0x80 {
// Run of repeated value
Expand All @@ -231,24 +244,12 @@ impl<R: Read> ByteRleDecoder<R> {
}
}

impl<R: Read> PrimitiveValueDecoder<i8> for ByteRleDecoder<R> {
// TODO: can probably implement this better
fn decode(&mut self, out: &mut [i8]) -> Result<()> {
for x in out.iter_mut() {
if self.index == self.leftovers.len() {
self.read_values()?;
}
*x = self.leftovers[self.index] as i8;
self.index += 1;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use crate::encoding::PrimitiveValueDecoder;

use super::*;

use proptest::prelude::*;
Expand Down
70 changes: 18 additions & 52 deletions src/encoding/integer/rle_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use snafu::OptionExt;

use crate::{
encoding::{
rle::GenericRle,
util::{read_u8, try_read_u8},
PrimitiveValueDecoder,
},
error::{OutOfSpecSnafu, Result},
};
Expand Down Expand Up @@ -76,21 +76,6 @@ impl<N: NInt, R: Read, S: EncodingSign> RleV1Decoder<N, R, S> {
sign: Default::default(),
}
}

fn decode_batch(&mut self) -> Result<()> {
self.current_head = 0;
self.decoded_ints.clear();

match EncodingType::from_header(&mut self.reader)? {
Some(EncodingType::Literals { length }) => {
read_literals::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length)
}
Some(EncodingType::Run { length, delta }) => {
read_run::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length, delta)
}
None => Ok(()),
}
}
}

fn read_literals<N: NInt, R: Read, S: EncodingSign>(
Expand Down Expand Up @@ -137,46 +122,27 @@ fn read_run<N: NInt, R: Read, S: EncodingSign>(
Ok(())
}

impl<N: NInt, R: Read, S: EncodingSign> PrimitiveValueDecoder<N> for RleV1Decoder<N, R, S> {
// TODO: this is exact duplicate from RLEv2 version; deduplicate it
fn decode(&mut self, out: &mut [N]) -> Result<()> {
let available = &self.decoded_ints[self.current_head..];
// If we have enough in buffer to copy over
if available.len() >= out.len() {
out.copy_from_slice(&available[..out.len()]);
self.current_head += out.len();
return Ok(());
}

// Otherwise progressively copy over chunks
let len_to_copy = out.len();
let mut copied = 0;
while copied < len_to_copy {
let copying = self.decoded_ints.len() - self.current_head;
// At most, we fill to exact length of output buffer (don't overflow)
let copying = copying.min(len_to_copy - copied);
impl<N: NInt, R: Read, S: EncodingSign> GenericRle<N> for RleV1Decoder<N, R, S> {
fn advance(&mut self, n: usize) {
self.current_head += n;
}

let target_out_slice = &mut out[copied..copied + copying];
target_out_slice.copy_from_slice(
&self.decoded_ints[self.current_head..self.current_head + copying],
);
fn available(&self) -> &[N] {
&self.decoded_ints[self.current_head..]
}

copied += copying;
self.current_head += copying;
fn decode_batch(&mut self) -> Result<()> {
self.current_head = 0;
self.decoded_ints.clear();

if self.current_head == self.decoded_ints.len() {
self.decode_batch()?;
match EncodingType::from_header(&mut self.reader)? {
Some(EncodingType::Literals { length }) => {
read_literals::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length)
}
}

if copied != out.len() {
// TODO: more descriptive error
OutOfSpecSnafu {
msg: "Array length less than expected",
Some(EncodingType::Run { length, delta }) => {
read_run::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length, delta)
}
.fail()
} else {
Ok(())
None => Ok(()),
}
}
}
Expand All @@ -185,7 +151,7 @@ impl<N: NInt, R: Read, S: EncodingSign> PrimitiveValueDecoder<N> for RleV1Decode
mod tests {
use std::io::Cursor;

use crate::encoding::integer::UnsignedEncoding;
use crate::encoding::{integer::UnsignedEncoding, PrimitiveValueDecoder};

use super::*;

Expand Down
62 changes: 16 additions & 46 deletions src/encoding/integer/rle_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::{io::Read, marker::PhantomData};
use bytes::BytesMut;

use crate::{
encoding::{util::try_read_u8, PrimitiveValueDecoder, PrimitiveValueEncoder},
error::{OutOfSpecSnafu, Result},
encoding::{rle::GenericRle, util::try_read_u8, PrimitiveValueEncoder},
error::Result,
memory::EstimateMemory,
};

Expand Down Expand Up @@ -98,6 +98,16 @@ impl<N: NInt, R: Read, S: EncodingSign> RleV2Decoder<N, R, S> {
sign: Default::default(),
}
}
}

impl<N: NInt, R: Read, S: EncodingSign> GenericRle<N> for RleV2Decoder<N, R, S> {
fn advance(&mut self, n: usize) {
self.current_head += n;
}

fn available(&self) -> &[N] {
&self.decoded_ints[self.current_head..]
}

fn decode_batch(&mut self) -> Result<()> {
self.current_head = 0;
Expand Down Expand Up @@ -131,49 +141,6 @@ impl<N: NInt, R: Read, S: EncodingSign> RleV2Decoder<N, R, S> {
}
}

impl<N: NInt, R: Read, S: EncodingSign> PrimitiveValueDecoder<N> for RleV2Decoder<N, R, S> {
fn decode(&mut self, out: &mut [N]) -> Result<()> {
let available = &self.decoded_ints[self.current_head..];
// If we have enough in buffer to copy over
if available.len() >= out.len() {
out.copy_from_slice(&available[..out.len()]);
self.current_head += out.len();
return Ok(());
}

// Otherwise progressively copy over chunks
let len_to_copy = out.len();
let mut copied = 0;
while copied < len_to_copy {
let copying = self.decoded_ints.len() - self.current_head;
// At most, we fill to exact length of output buffer (don't overflow)
let copying = copying.min(len_to_copy - copied);

let target_out_slice = &mut out[copied..copied + copying];
target_out_slice.copy_from_slice(
&self.decoded_ints[self.current_head..self.current_head + copying],
);

copied += copying;
self.current_head += copying;

if self.current_head == self.decoded_ints.len() {
self.decode_batch()?;
}
}

if copied != out.len() {
// TODO: more descriptive error
OutOfSpecSnafu {
msg: "Array length less than expected",
}
.fail()
} else {
Ok(())
}
}
}

struct DeltaEncodingCheckResult<N: NInt> {
base_value: N,
min: N,
Expand Down Expand Up @@ -541,7 +508,10 @@ mod tests {

use proptest::prelude::*;

use crate::encoding::integer::{SignedEncoding, UnsignedEncoding};
use crate::encoding::{
integer::{SignedEncoding, UnsignedEncoding},
PrimitiveValueDecoder,
};

use super::*;

Expand Down
1 change: 1 addition & 0 deletions src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod byte;
pub mod decimal;
pub mod float;
pub mod integer;
mod rle;
pub mod timestamp;
mod util;

Expand Down
99 changes: 99 additions & 0 deletions src/encoding/rle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::{OutOfSpecSnafu, Result};

use super::PrimitiveValueDecoder;

mod sealed {
use std::io::Read;

use crate::encoding::{
byte::ByteRleDecoder,
integer::{rle_v1::RleV1Decoder, rle_v2::RleV2Decoder, EncodingSign, NInt},
};

pub trait Rle {}

impl<R: Read> Rle for ByteRleDecoder<R> {}
impl<N: NInt, R: Read, S: EncodingSign> Rle for RleV1Decoder<N, R, S> {}
impl<N: NInt, R: Read, S: EncodingSign> Rle for RleV2Decoder<N, R, S> {}
}

/// Generic decoding behaviour for run length encoded values, such as integers (v1 and v2)
/// and bytes.
///
/// Assumes an internal buffer which acts like a (single headed) queue where values are first
/// decoded into, before being copied out into the output buffer (usually an Arrow array).
pub trait GenericRle<V: Copy> {
/// Consume N elements from internal buffer to signify the values having been copied out.
fn advance(&mut self, n: usize);

/// All values available in internal buffer, respecting the current advancement level.
fn available(&self) -> &[V];

/// This should clear the internal buffer and populate it with the next round of decoded
/// values.
// TODO: Have a version that copies directly into the output buffer (e.g. Arrow array).
// Currently we always decode to the internal buffer first, even if we can copy
// directly to the output and skip the middle man. Ideally the internal buffer
// should only be used for leftovers between calls to PrimitiveValueDecoder::decode.
fn decode_batch(&mut self) -> Result<()>;
}

impl<V: Copy, G: GenericRle<V> + sealed::Rle> PrimitiveValueDecoder<V> for G {
fn decode(&mut self, out: &mut [V]) -> Result<()> {
let available = self.available();
// If we have enough leftover to copy, can skip decoding more.
if available.len() >= out.len() {
out.copy_from_slice(&available[..out.len()]);
self.advance(out.len());
return Ok(());
}

// Otherwise progressively decode and copy over chunks.
let len_to_copy = out.len();
let mut copied = 0;
while copied < len_to_copy {
if self.available().is_empty() {
self.decode_batch()?;
}

let copying = self.available().len();
// At most, we fill to exact length of output buffer (don't overflow).
let copying = copying.min(len_to_copy - copied);

let out = &mut out[copied..];
out[..copying].copy_from_slice(&self.available()[..copying]);

copied += copying;
self.advance(copying);
}

// We always expect to be able to fill the output buffer; it is up to the
// caller to control that size.
if copied != out.len() {
// TODO: more descriptive error
OutOfSpecSnafu {
msg: "Array length less than expected",
}
.fail()
} else {
Ok(())
}
}
}

0 comments on commit be8a685

Please sign in to comment.