Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit List Row Encoding (#5807) #5811

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,9 @@ mod variable;
///
/// Lists are encoded by first encoding all child elements to the row format.
///
/// A "canonical byte array" is then constructed by concatenating the row
/// encodings of all their elements into a single binary array, followed
/// by the lengths of each encoded row, and the number of elements, encoded
/// as big endian `u32`.
///
/// This canonical byte array is then encoded using the variable length byte
/// encoding described above.
///
/// _The lengths are not strictly necessary but greatly simplify decode, they
/// may be removed in a future iteration_.
/// A list value is then encoded as the concatenation of each of the child elements,
/// separately encoded using the variable length encoding described above, followed
/// by the variable length encoding of an empty byte array.
///
/// For example given:
///
Expand All @@ -323,24 +316,23 @@ mod variable;
/// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘
///```
///
/// Which would be grouped into the following canonical byte arrays:
/// Which would be encoded as
///
/// ```text
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, 2_u8, 3_u8] │01│01│01│02│01│03│00│00│00│02│00│00│00│02│00│00│00│02│00│00│00│03│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// └──── rows ────┘ └───────── row lengths ─────────┘ └─ count ─┘
///
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, null] │01│01│00│00│00│00│00│02│00│00│00│02│00│00│00│02│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, 2_u8, 3_u8] │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// └──── 1_u8 ────┘ └──── 2_u8 ────┘ └──── 3_u8 ────┘
///
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, null] │02│01│01│00│00│02│02│00│00│00│00│02│01│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// └──── 1_u8 ────┘ └──── null ────┘
///
///```
///
/// With `[]` represented by an empty byte array, and `null` a null byte array.
///
/// These byte arrays will then be encoded using the variable length byte encoding
/// described above.
///
/// # Ordering
///
/// ## Float Ordering
Expand Down Expand Up @@ -2271,4 +2263,16 @@ mod tests {

dictionary_eq(&back[0], &array);
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this test covers the issue by running this test on main and it failed like this:

thread 'tests::test_list_prefix' panicked at arrow-row/src/lib.rs:2284:9:
assertion `left == right` failed
  left: Greater
 right: Less
stack backtrace:
   0: rust_begin_unwind
             at /rustc/9b00956e56009bab2aa15d7bff10916599e3d6d6/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/9b00956e56009bab2aa15d7bff10916599e3d6d6/library/core/src/panicking.rs:72:14
   2: core::panicking::assert_failed_inner
   3: core::panicking::assert_failed
             at /rustc/9b00956e56009bab2aa15d7bff10916599e3d6d6/library/core/src/panicking.rs:298:5
   4: arrow_row::tests::test_list_prefix
             at ./src/lib.rs:2284:9
   5: arrow_row::tests::test_list_prefix::{{closure}}
             at ./src/lib.rs:2276:26
   6: core::ops::function::FnOnce::call_once
             at /rustc/9b00956e56009bab2aa15d7bff10916599e3d6d6/library/core/src/ops/function.rs:250:5
   7: core::ops::function::FnOnce::call_once
             at /rustc/9b00956e56009bab2aa15d7bff10916599e3d6d6/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

error: test failed, to rerun pass `--lib`
error: 1 target failed:
    `--lib`

fn test_list_prefix() {
let mut a = ListBuilder::new(Int8Builder::new());
a.append_value([None]);
a.append_value([None, None]);
let a = a.finish();

let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
}
}
135 changes: 73 additions & 62 deletions arrow-row/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::{RowConverter, Rows, SortField};
use arrow_array::builder::BufferBuilder;
use crate::{null_sentinel, RowConverter, Rows, SortField};
use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, SortOptions};
use std::ops::Range;
Expand All @@ -43,12 +43,10 @@ pub fn compute_lengths<O: OffsetSizeTrait>(
fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
match range {
None => 1,
Some(range) if range.start == range.end => 1,
Some(range) => {
let element_count = range.end - range.start;
let row_bytes = range.map(|i| rows.row(i).as_ref().len()).sum::<usize>();
let total = (1 + element_count) * std::mem::size_of::<u32>() + row_bytes;
super::variable::padded_length(Some(total))
1 + range
.map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
.sum::<usize>()
}
}
}
Expand All @@ -63,7 +61,6 @@ pub fn encode<O: OffsetSizeTrait>(
opts: SortOptions,
array: &GenericListArray<O>,
) {
let mut temporary = vec![];
offsets
.iter_mut()
.skip(1)
Expand All @@ -74,42 +71,28 @@ pub fn encode<O: OffsetSizeTrait>(
let end = offsets[1].as_usize();
let range = array.is_valid(idx).then_some(start..end);
let out = &mut data[*offset..];
*offset += encode_one(out, &mut temporary, rows, range, opts)
*offset += encode_one(out, rows, range, opts)
});
}

#[inline]
fn encode_one(
out: &mut [u8],
temporary: &mut Vec<u8>,
rows: &Rows,
range: Option<Range<usize>>,
opts: SortOptions,
) -> usize {
temporary.clear();

match range {
None => super::variable::encode_one(out, None, opts),
Some(range) if range.start == range.end => {
super::variable::encode_one(out, Some(&[]), opts)
}
None => super::variable::encode_null(out, opts),
Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
Some(range) => {
for row in range.clone().map(|i| rows.row(i)) {
temporary.extend_from_slice(row.as_ref());
}
for row in range.clone().map(|i| rows.row(i)) {
let len: u32 = row
.as_ref()
.len()
.try_into()
.expect("ListArray or LargeListArray containing a list of more than u32::MAX items is not supported");
temporary.extend_from_slice(&len.to_be_bytes());
let mut offset = 0;
for i in range {
let row = rows.row(i);
offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
}
let row_count: u32 = (range.end - range.start)
.try_into()
.expect("lists containing more than u32::MAX elements not supported");
temporary.extend_from_slice(&row_count.to_be_bytes());
super::variable::encode_one(out, Some(temporary), opts)
offset += super::variable::encode_empty(&mut out[offset..], opts);
offset
}
}
}
Expand All @@ -125,50 +108,78 @@ pub unsafe fn decode<O: OffsetSizeTrait>(
field: &SortField,
validate_utf8: bool,
) -> Result<GenericListArray<O>, ArrowError> {
let canonical = super::variable::decode_binary::<i64>(rows, field.options);

let mut offsets = BufferBuilder::<O>::new(rows.len() + 1);
offsets.append(O::from_usize(0).unwrap());
let mut current_offset = 0;

let mut child_rows = Vec::with_capacity(rows.len());
canonical.value_offsets().windows(2).for_each(|w| {
let start = w[0] as usize;
let end = w[1] as usize;
if start == end {
// Null or empty list
offsets.append(O::from_usize(current_offset).unwrap());
return;
}
let opts = field.options;

let mut values_bytes = 0;

let row = &canonical.value_data()[start..end];
let element_count_start = row.len() - 4;
let element_count =
u32::from_be_bytes((&row[element_count_start..]).try_into().unwrap()) as usize;
let mut offset = 0;
let mut offsets = Vec::with_capacity(rows.len() + 1);
offsets.push(O::usize_as(0));

let lengths_start = element_count_start - (element_count * 4);
for row in rows.iter_mut() {
let mut row_offset = 0;
row[lengths_start..element_count_start]
.chunks_exact(4)
.for_each(|chunk| {
let len = u32::from_be_bytes(chunk.try_into().unwrap());
let next_row_offset = row_offset + len as usize;
child_rows.push(&row[row_offset..next_row_offset]);
row_offset = next_row_offset;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes += x.len();
});
if decoded <= 1 {
offsets.push(O::usize_as(offset));
break;
}
row_offset += decoded;
offset += 1;
}
}
O::from_usize(offset).expect("overflow");

current_offset += element_count;
offsets.append(O::from_usize(current_offset).unwrap());
let mut null_count = 0;
let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
let valid = rows[x][0] != null_sentinel(opts);
null_count += !valid as usize;
valid
});

let mut values_offsets = Vec::with_capacity(offset);
let mut values_bytes = Vec::with_capacity(values_bytes);
for row in rows.iter_mut() {
let mut row_offset = 0;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes.extend_from_slice(x)
});
row_offset += decoded;
if decoded <= 1 {
break;
}
values_offsets.push(values_bytes.len());
}
*row = &row[row_offset..];
}

if opts.descending {
values_bytes.iter_mut().for_each(|o| *o = !*o);
}

let mut last_value_offset = 0;
let mut child_rows: Vec<_> = values_offsets
.into_iter()
.map(|offset| {
let v = &values_bytes[last_value_offset..offset];
last_value_offset = offset;
v
})
.collect();

let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
assert_eq!(child.len(), 1);

let child_data = child[0].to_data();

let builder = ArrayDataBuilder::new(field.data_type.clone())
.len(rows.len())
.nulls(canonical.nulls().cloned())
.add_buffer(offsets.finish())
.null_count(null_count)
.null_bit_buffer(Some(nulls.into()))
.add_buffer(Buffer::from_vec(offsets))
.add_child_data(child_data);

Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
Expand Down
28 changes: 16 additions & 12 deletions arrow-row/src/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,23 @@ pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
}
}

pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
out[0] = null_sentinel(opts);
1
}

pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
out[0] = match opts.descending {
true => !EMPTY_SENTINEL,
false => EMPTY_SENTINEL,
};
1
}

pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
match val {
Some([]) => {
out[0] = match opts.descending {
true => !EMPTY_SENTINEL,
false => EMPTY_SENTINEL,
};
1
}
None => encode_null(out, opts),
Some([]) => encode_empty(out, opts),
Some(val) => {
// Write `2_u8` to demarcate as non-empty, non-null string
out[0] = NON_EMPTY_SENTINEL;
Expand All @@ -111,10 +119,6 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz
}
len
}
None => {
out[0] = null_sentinel(opts);
1
}
}
}

Expand Down Expand Up @@ -148,7 +152,7 @@ fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
end_offset
}

fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
let (non_empty_sentinel, continuation) = match options.descending {
true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
Expand Down
Loading