Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GroupColumn support for StringView / ByteView (faster grouping performance) #12809

Merged
merged 27 commits into from
Oct 16, 2024
Merged
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ca033e0
define `ByteGroupValueViewBuilder`.
Rachelint Oct 8, 2024
ffcc1a2
impl append.
Rachelint Oct 8, 2024
4842965
impl equal to.
Rachelint Oct 8, 2024
66bb7be
fix compile.
Rachelint Oct 9, 2024
ef1efce
fix comments.
Rachelint Oct 9, 2024
152a8b1
impl take_n.
Rachelint Oct 11, 2024
d61c3ec
impl build.
Rachelint Oct 11, 2024
151377e
impl rest functions in `GroupColumn`.
Rachelint Oct 11, 2024
63e11cb
fix output when panic.
Rachelint Oct 11, 2024
15d8349
add e2e sql tests.
Rachelint Oct 12, 2024
d9ee724
add unit tests.
Rachelint Oct 12, 2024
beffa35
switch to a really elegant style codes from alamb.
Rachelint Oct 12, 2024
46822f9
fix take_n.
Rachelint Oct 12, 2024
3a93584
improve comments.
Rachelint Oct 12, 2024
f99f55c
fix compile.
Rachelint Oct 12, 2024
37b4816
fix clippy.
Rachelint Oct 13, 2024
d78c68d
define more testcases in `test_byte_view_take_n`.
Rachelint Oct 13, 2024
7cb7dfc
connect up.
Rachelint Oct 13, 2024
e6c7e7e
fix doc.
Rachelint Oct 13, 2024
36d556e
Do not re-validate output is utf8
alamb Oct 13, 2024
f76c376
Merge pull request #1 from alamb/alamb/tweak-group
Rachelint Oct 13, 2024
1fd926f
switch to unchecked when building array.
Rachelint Oct 13, 2024
34918cb
improve naming.
Rachelint Oct 13, 2024
8348024
use let else to make the codes clearer.
Rachelint Oct 13, 2024
023ed64
fix typo.
Rachelint Oct 13, 2024
c4d45c7
improve unit test coverage for `ByteViewGroupValueBuilder`.
Rachelint Oct 13, 2024
a1f8d0c
Merge remote-tracking branch 'apache/main' into impl-byte-view-column
alamb Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions datafusion/physical-plan/src/aggregates/group_values/group_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,33 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::make_view;
use arrow::array::BufferBuilder;
use arrow::array::ByteView;
use arrow::array::GenericBinaryArray;
use arrow::array::GenericStringArray;
use arrow::array::OffsetSizeTrait;
use arrow::array::PrimitiveArray;
use arrow::array::PrimitiveBuilder;
use arrow::array::StringBuilder;
use arrow::array::StringViewBuilder;
use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
use arrow::buffer::OffsetBuffer;
use arrow::buffer::ScalarBuffer;
use arrow::datatypes::ByteArrayType;
use arrow::datatypes::ByteViewType;
use arrow::datatypes::DataType;
use arrow::datatypes::GenericBinaryType;
use arrow_array::BinaryViewArray;
use arrow_array::GenericByteViewArray;
use arrow_array::StringViewArray;
use arrow_buffer::Buffer;
use datafusion_common::utils::proxy::VecAllocExt;

use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow_array::types::GenericStringType;
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
use std::mem;
use std::sync::Arc;
use std::vec;

Expand Down Expand Up @@ -376,6 +387,176 @@ where
}
}

/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
///
/// Stores a collection of binary view or utf8 view group values in a buffer
/// whose structure is similar to `GenericByteViewArray`, and we can get benefits:
///
/// 1. Efficient comparison of incoming rows to existing rows
/// 2. Efficient construction of the final output array
/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
pub struct ByteGroupValueViewBuilder {
output_type: OutputType,

/// The views of string values
///
/// If string len <= 12, the view's format will be:
/// string(12B) | len(4B)
///
/// If string len > 12, its format will be:
/// offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
views: Vec<u128>,

/// The progressing block
///
/// New values will be inserted into it until its capacity
/// is not enough(detail can see `max_block_size`).
in_progress: Vec<u8>,

/// The completed blocks
completed: Vec<Buffer>,

/// The max size of `in_progress`
///
/// `in_progress` will be flushed into `completed`, and create new `in_progress`
/// when found its remaining capacity(`max_block_size` - `len(in_progress)`),
/// is no enough to store the appended value.
max_block_size: usize,

/// Nulls
nulls: MaybeNullBufferBuilder,
}

impl ByteGroupValueViewBuilder {
fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
where
B: ByteViewType,
{
let arr = array.as_byte_view::<B>();

// If a null row, set and return
if arr.is_null(row) {
Copy link
Contributor

@Dandandan Dandandan Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice in the future to avoid those null checks in GroupColumn (even if input is nullable field) for batches containing no nulls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems really make sense.

And I found even for the batches containing some nulls, actually we have checked which rows are really nulls in create_hashes .
Maybe it is possible that, we reuse this check result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to pull the null / not null check into the caller of this function 🤔

Copy link
Contributor Author

@Rachelint Rachelint Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I filed an issue about this, and I am trying the straight way about using null_count. #12944
Let's see the performance improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we need to do the check on the batch (does the batch contain no nulls-> follow the fast path that omits the check), so I think indeed the calling side needs to perform this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly interesting: one of the reasons special casing nulls/no-nulls can be helpful is that it permits better auto vectorization, as we are documenting here: apache/arrow-rs#6554

self.nulls.append(true);
self.views.push(0);
return;
}

// Not null case
self.nulls.append(false);
let value: &[u8] = arr.value(row).as_ref();

let value_len = value.len();
let view = if value_len <= 12 {
make_view(value, 0, 0)
} else {
// Ensure big enough block to hold the value firstly
self.ensure_in_progress_big_enough(value_len);

// Append value
let buffer_index = self.completed.len();
let offset = self.in_progress.len();
self.in_progress.extend_from_slice(value);

make_view(value, buffer_index as u32, offset as u32)
};

// Append view
self.views.push(view);
}

fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
debug_assert!(value_len > 12);
let require_cap = self.in_progress.len() + value_len;

// If current block isn't big enough, flush it and create a new in progress block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be If current block is big enough?

Copy link
Contributor Author

@Rachelint Rachelint Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be If current block is big enough?

Maybe can improve like that If current in_progress block have no enough room to hold the new value?

if require_cap > self.max_block_size {
let flushed_block = mem::replace(
&mut self.in_progress,
Vec::with_capacity(self.max_block_size),
);
let buffer = Buffer::from_vec(flushed_block);
self.completed.push(buffer);
}
}

fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool
where
B: ByteViewType,
{
let array = array.as_byte_view::<B>();

// Check if nulls equal firstly
let exist_null = self.nulls.is_null(lhs_row);
let input_null = array.is_null(rhs_row);
if let Some(result) = nulls_equal_to(exist_null, input_null) {
return result;
}

// Otherwise, we need to check their values
let exist_view = self.views[lhs_row];
let exist_view_len = exist_view as u32;

let input_view = array.views()[rhs_row];
let input_view_len = input_view as u32;

// The check logic
// - Check len equality
// - If inlined, check inlined value
// - If non-inlined, check prefix and then check value in buffer
// when needed
if exist_view_len != input_view_len {
return false;
}

if exist_view_len <= 12 {
let exist_inline = unsafe {
GenericByteViewArray::<B>::inline_value(
&exist_view,
exist_view_len as usize,
)
};
let input_inline = unsafe {
GenericByteViewArray::<B>::inline_value(
&input_view,
input_view_len as usize,
)
};
exist_inline == input_inline
} else {
let exist_prefix =
unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 4) };
let input_prefix =
unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 4) };

if exist_prefix != input_prefix {
return false;
}

let exist_full = {
let byte_view = ByteView::from(exist_view);
self.value(
byte_view.buffer_index as usize,
byte_view.offset as usize,
byte_view.length as usize,
)
};
let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() };
exist_full == input_full
}
}

fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] {
debug_assert!(buffer_index <= self.completed.len());

if buffer_index < self.completed.len() {
let block = &self.completed[buffer_index];
&block[offset..offset + length]
} else {
&self.in_progress[offset..offset + length]
}
}
}

/// Determines if the nullability of the existing and new input array can be used
/// to short-circuit the comparison of the two values.
///
Expand Down
Loading