From ca033e09ba15ff66e38c9821c52e1a6c61c76497 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Tue, 8 Oct 2024 14:35:27 +0800
Subject: [PATCH 01/25] define `ByteGroupValueViewBuilder`.

---
 .../aggregates/group_values/group_column.rs   | 66 +++++++++++++++++++
 1 file changed, 66 insertions(+)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 5d00f300e960..4092809b4fb6 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -20,12 +20,20 @@ use arrow::array::GenericBinaryArray;
 use arrow::array::GenericStringArray;
 use arrow::array::OffsetSizeTrait;
 use arrow::array::PrimitiveArray;
+use arrow::array::PrimitiveBuilder;
+use arrow::array::StringBuilder;
+use arrow::array::StringViewBuilder;
 use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
 use arrow::buffer::OffsetBuffer;
 use arrow::buffer::ScalarBuffer;
 use arrow::datatypes::ByteArrayType;
+use arrow::datatypes::ByteViewType;
 use arrow::datatypes::DataType;
 use arrow::datatypes::GenericBinaryType;
+use arrow_array::BinaryViewArray;
+use arrow_array::GenericByteViewArray;
+use arrow_array::StringViewArray;
+use arrow_buffer::Buffer;
 use datafusion_common::utils::proxy::VecAllocExt;
 
 use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
@@ -376,6 +384,64 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteGroupValueViewBuilder {
+    output_type: OutputType,
+
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new `in_progress`
+    /// when found its remaining capacity(`max_block_size` - `len(in_progress)`),
+    /// is no enough to store the appended value.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+}
+
+// impl ByteGroupValueViewBuilder {
+//     fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
+//     where
+//         B: ByteViewType,
+//     {
+//         let arr = array.as_byte_view::<B>();
+//         if arr.is_null(row) {
+//             self.nulls.append(true);
+//             self.views.push(0);
+//         } else {
+//             self.nulls.append(false);
+//             let value: &[u8] = arr.value(row).as_ref();
+//             self.buffer.append_slice(value);
+//             self.offsets.push(O::usize_as(self.buffer.len()));
+//         }
+//     }
+// }
+
 /// Determines if the nullability of the existing and new input array can be used
 /// to short-circuit the comparison of the two values.
 ///

From ffcc1a25a427999b36c72bc6a9d28521aad3643b Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Tue, 8 Oct 2024 17:07:06 +0800
Subject: [PATCH 02/25] impl append.

---
 .../aggregates/group_values/group_column.rs   | 71 ++++++++++++++-----
 1 file changed, 54 insertions(+), 17 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 4092809b4fb6..e39afa5ba739 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::make_view;
 use arrow::array::BufferBuilder;
 use arrow::array::GenericBinaryArray;
 use arrow::array::GenericStringArray;
@@ -39,6 +40,7 @@ use datafusion_common::utils::proxy::VecAllocExt;
 use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
 use arrow_array::types::GenericStringType;
 use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
+use std::mem;
 use std::sync::Arc;
 use std::vec;
 
@@ -424,23 +426,58 @@ pub struct ByteGroupValueViewBuilder {
     nulls: MaybeNullBufferBuilder,
 }
 
-// impl ByteGroupValueViewBuilder {
-//     fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
-//     where
-//         B: ByteViewType,
-//     {
-//         let arr = array.as_byte_view::<B>();
-//         if arr.is_null(row) {
-//             self.nulls.append(true);
-//             self.views.push(0);
-//         } else {
-//             self.nulls.append(false);
-//             let value: &[u8] = arr.value(row).as_ref();
-//             self.buffer.append_slice(value);
-//             self.offsets.push(O::usize_as(self.buffer.len()));
-//         }
-//     }
-// }
+impl ByteGroupValueViewBuilder {
+    fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // If a null row, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len > 12 {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let block_id = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, block_id, offset)
+        } else {
+            make_view(value, 0, 0)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+}
 
 /// Determines if the nullability of the existing and new input array can be used
 /// to short-circuit the comparison of the two values.

From 48429654c9d3a2278b680168cd767a68ffe00ce9 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Tue, 8 Oct 2024 19:01:46 +0800
Subject: [PATCH 03/25] impl equal to.

---
 .../aggregates/group_values/group_column.rs   | 84 ++++++++++++++++++-
 1 file changed, 81 insertions(+), 3 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index e39afa5ba739..a4361cd1ab20 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -17,6 +17,7 @@
 
 use arrow::array::make_view;
 use arrow::array::BufferBuilder;
+use arrow::array::ByteView;
 use arrow::array::GenericBinaryArray;
 use arrow::array::GenericStringArray;
 use arrow::array::OffsetSizeTrait;
@@ -445,7 +446,9 @@ impl ByteGroupValueViewBuilder {
         let value: &[u8] = arr.value(row).as_ref();
 
         let value_len = value.len();
-        let view = if value_len > 12 {
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
             // Ensure big enough block to hold the value firstly
             self.ensure_in_progress_big_enough(value_len);
 
@@ -455,8 +458,6 @@ impl ByteGroupValueViewBuilder {
             self.in_progress.extend_from_slice(value);
 
             make_view(value, block_id, offset)
-        } else {
-            make_view(value, 0, 0)
         };
 
         // Append view
@@ -477,6 +478,83 @@ impl ByteGroupValueViewBuilder {
             self.completed.push(buffer);
         }
     }
+
+    fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool
+    where
+        B: ByteViewType,
+    {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        //   - If inlined, check inlined value
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<T>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<T>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
 }
 
 /// Determines if the nullability of the existing and new input array can be used

From 66bb7be75b1ec8952e6b94535bc10e9af6623f9f Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Thu, 10 Oct 2024 01:52:09 +0800
Subject: [PATCH 04/25] fix compile.

---
 .../src/aggregates/group_values/group_column.rs           | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index a4361cd1ab20..61894ffe4a05 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -453,11 +453,11 @@ impl ByteGroupValueViewBuilder {
             self.ensure_in_progress_big_enough(value_len);
 
             // Append value
-            let block_id = self.completed.len();
+            let buffer_index = self.completed.len();
             let offset = self.in_progress.len();
             self.in_progress.extend_from_slice(value);
 
-            make_view(value, block_id, offset)
+            make_view(value, buffer_index as u32, offset as u32)
         };
 
         // Append view
@@ -510,13 +510,13 @@ impl ByteGroupValueViewBuilder {
 
         if exist_view_len <= 12 {
             let exist_inline = unsafe {
-                GenericByteViewArray::<T>::inline_value(
+                GenericByteViewArray::<B>::inline_value(
                     &exist_view,
                     exist_view_len as usize,
                 )
             };
             let input_inline = unsafe {
-                GenericByteViewArray::<T>::inline_value(
+                GenericByteViewArray::<B>::inline_value(
                     &input_view,
                     input_view_len as usize,
                 )

From ef1efcecf682bb4e39b6f9f91555075650572ea9 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Thu, 10 Oct 2024 02:13:06 +0800
Subject: [PATCH 05/25] fix comments.

---
 .../physical-plan/src/aggregates/group_values/group_column.rs   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 61894ffe4a05..13fc0865115f 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -501,9 +501,9 @@ impl ByteGroupValueViewBuilder {
 
         // The check logic
         //   - Check len equality
+        //   - If inlined, check inlined value
         //   - If non-inlined, check prefix and then check value in buffer
         //     when needed
-        //   - If inlined, check inlined value
         if exist_view_len != input_view_len {
             return false;
         }

From 152a8b1a847ceec9e943a00ec5e2f18fcd2240e4 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sat, 12 Oct 2024 03:40:20 +0800
Subject: [PATCH 06/25] impl take_n.

---
 .../aggregates/group_values/group_column.rs   | 125 ++++++++++++++++++
 1 file changed, 125 insertions(+)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 13fc0865115f..8276ee0067e9 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -28,10 +28,12 @@ use arrow::array::StringViewBuilder;
 use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
 use arrow::buffer::OffsetBuffer;
 use arrow::buffer::ScalarBuffer;
+use arrow::datatypes::BinaryViewType;
 use arrow::datatypes::ByteArrayType;
 use arrow::datatypes::ByteViewType;
 use arrow::datatypes::DataType;
 use arrow::datatypes::GenericBinaryType;
+use arrow::datatypes::StringViewType;
 use arrow_array::BinaryViewArray;
 use arrow_array::GenericByteViewArray;
 use arrow_array::StringViewArray;
@@ -557,6 +559,129 @@ impl ByteGroupValueViewBuilder {
     }
 }
 
+impl GroupColumn for ByteGroupValueViewBuilder {
+    fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
+        todo!()
+    }
+
+    fn append_val(&mut self, array: &ArrayRef, row: usize) {
+        todo!()
+    }
+
+    fn len(&self) -> usize {
+        todo!()
+    }
+
+    fn size(&self) -> usize {
+        todo!()
+    }
+
+    fn build(self: Box<Self>) -> ArrayRef {
+        todo!()
+    }
+
+    fn take_n(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // Take n for nulls
+        let null_buffer = self.nulls.take_n(n);
+
+        // Take n for values:
+        //   - Take first n `view`s from `views`
+        //
+        //   - Find the last non-inlined `view`, if all inlined,
+        //     we can build array and return happily, otherwise we
+        //     we need to continue to process related buffers
+        //
+        //   - Get the last related `buffer index`(let's name it `buffer index n`)
+        //     from last non-inlined `view`
+        //
+        //   - Take `0 ~ buffer index n-1` buffers, clone the `buffer index` buffer
+        //     (data part is wrapped by `Arc`, cheap to clone) if it is in `completed`,
+        //     or split
+        //
+        //   - Shift the `buffer index` of remaining non-inlined `views`
+        //
+        let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
+
+        let last_non_inlined_view = first_n_views
+            .iter()
+            .rev()
+            .find(|view| ((**view) as u32) > 12);
+
+        if let Some(view) = last_non_inlined_view {
+            let view = ByteView::from(*view);
+            let last_related_buffer_index = view.buffer_index as usize;
+            let mut taken_buffers = Vec::with_capacity(last_related_buffer_index + 1);
+
+            // Take `0 ~ last_related_buffer_index - 1` buffers
+            if !self.completed.is_empty() {
+                taken_buffers.extend(self.completed.drain(0..last_related_buffer_index));
+            }
+
+            // Process the `last_related_buffer_index` buffers
+            let last_buffer = if last_related_buffer_index < self.completed.len() {
+                // If it is in `completed`, simply clone
+                self.completed[last_related_buffer_index].clone()
+            } else {
+                // If it is `in_progress`, copied `0 ~ offset` part
+                let taken_last_buffer =
+                    self.in_progress[0..view.offset as usize].to_vec();
+                Buffer::from_vec(taken_last_buffer)
+            };
+            taken_buffers.push(last_buffer);
+
+            // Shift `buffer index` finally
+            self.views.iter_mut().for_each(|view| {
+                if (*view as u32) > 12 {
+                    let mut byte_view = ByteView::from(*view);
+                    byte_view.buffer_index -= last_related_buffer_index as u32;
+                    *view = byte_view.as_u128();
+                }
+            });
+
+            // Build array and return
+            let views = ScalarBuffer::from(first_n_views);
+            match self.output_type {
+                OutputType::Utf8View => {
+                    Arc::new(GenericByteViewArray::<StringViewType>::new(
+                        views,
+                        taken_buffers,
+                        null_buffer,
+                    ))
+                }
+                OutputType::BinaryView => {
+                    Arc::new(GenericByteViewArray::<BinaryViewType>::new(
+                        views,
+                        taken_buffers,
+                        null_buffer,
+                    ))
+                }
+                _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+            }
+        } else {
+            let views = ScalarBuffer::from(first_n_views);
+            match self.output_type {
+                OutputType::Utf8View => {
+                    Arc::new(GenericByteViewArray::<StringViewType>::new(
+                        views,
+                        Vec::new(),
+                        null_buffer,
+                    ))
+                }
+                OutputType::BinaryView => {
+                    Arc::new(GenericByteViewArray::<BinaryViewType>::new(
+                        views,
+                        Vec::new(),
+                        null_buffer,
+                    ))
+                }
+                _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+            }
+        }
+    }
+}
+
 /// Determines if the nullability of the existing and new input array can be used
 /// to short-circuit the comparison of the two values.
 ///

From d61c3ec6dc8d340d2a71f56c357554b076870f52 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sat, 12 Oct 2024 03:48:20 +0800
Subject: [PATCH 07/25] impl build.

---
 .../aggregates/group_values/group_column.rs   | 38 ++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 8276ee0067e9..d36123897786 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -577,7 +577,43 @@ impl GroupColumn for ByteGroupValueViewBuilder {
     }
 
     fn build(self: Box<Self>) -> ArrayRef {
-        todo!()
+        let Self {
+            output_type,
+            views,
+            in_progress,
+            mut completed,
+            max_block_size,
+            nulls,
+        } = *self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+        match output_type {
+            OutputType::Utf8View => {
+                Arc::new(GenericByteViewArray::<StringViewType>::new(
+                    views,
+                    completed,
+                    null_buffer,
+                ))
+            }
+            OutputType::BinaryView => {
+                Arc::new(GenericByteViewArray::<BinaryViewType>::new(
+                    views,
+                    completed,
+                    null_buffer,
+                ))
+            }
+            _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+        }
     }
 
     fn take_n(&mut self, n: usize) -> ArrayRef {

From 151377e46da65f1bda9dfe43ee4ca8d1cbdbf061 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sat, 12 Oct 2024 04:17:52 +0800
Subject: [PATCH 08/25] impl rest functions in `GroupColumn`.

---
 .../aggregates/group_values/group_column.rs   | 36 ++++++++++++++++---
 1 file changed, 31 insertions(+), 5 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index d36123897786..af8e85f4c585 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -561,19 +561,45 @@ impl ByteGroupValueViewBuilder {
 
 impl GroupColumn for ByteGroupValueViewBuilder {
     fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
-        todo!()
+        match self.output_type {
+            OutputType::Utf8View => {
+                self.equal_to_inner::<StringViewType>(lhs_row, array, rhs_row)
+            }
+            OutputType::BinaryView => {
+                self.equal_to_inner::<BinaryViewType>(lhs_row, array, rhs_row)
+            }
+            _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+        }
     }
 
     fn append_val(&mut self, array: &ArrayRef, row: usize) {
-        todo!()
+        match self.output_type {
+            OutputType::Utf8View => {
+                self.append_val_inner::<StringViewType>(array, row);
+            }
+            OutputType::BinaryView => {
+                self.append_val_inner::<BinaryViewType>(array, row);
+            }
+            _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+        }
     }
 
     fn len(&self) -> usize {
-        todo!()
+        self.views.len()
     }
 
     fn size(&self) -> usize {
-        todo!()
+        let buffers_size = self
+            .completed
+            .iter()
+            .map(|buf| buf.capacity() * std::mem::size_of::<u8>())
+            .sum::<usize>();
+
+        self.nulls.allocated_size()
+            + self.views.capacity() * std::mem::size_of::<u128>()
+            + self.in_progress.capacity() * std::mem::size_of::<u8>()
+            + buffers_size
+            + std::mem::size_of::<Self>()
     }
 
     fn build(self: Box<Self>) -> ArrayRef {
@@ -582,8 +608,8 @@ impl GroupColumn for ByteGroupValueViewBuilder {
             views,
             in_progress,
             mut completed,
-            max_block_size,
             nulls,
+            ..
         } = *self;
 
         // Build nulls

From 63e11cbeb6da5f52c1edc2f35e6e1b8cfdc18831 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sat, 12 Oct 2024 04:21:11 +0800
Subject: [PATCH 09/25] fix output when panic.

---
 .../src/aggregates/group_values/group_column.rs        | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index af8e85f4c585..bcd9d10da14f 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -568,7 +568,7 @@ impl GroupColumn for ByteGroupValueViewBuilder {
             OutputType::BinaryView => {
                 self.equal_to_inner::<BinaryViewType>(lhs_row, array, rhs_row)
             }
-            _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+            _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
         }
     }
 
@@ -580,7 +580,7 @@ impl GroupColumn for ByteGroupValueViewBuilder {
             OutputType::BinaryView => {
                 self.append_val_inner::<BinaryViewType>(array, row);
             }
-            _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+            _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
         }
     }
 
@@ -638,7 +638,7 @@ impl GroupColumn for ByteGroupValueViewBuilder {
                     null_buffer,
                 ))
             }
-            _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+            _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
         }
     }
 
@@ -719,7 +719,7 @@ impl GroupColumn for ByteGroupValueViewBuilder {
                         null_buffer,
                     ))
                 }
-                _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+                _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
             }
         } else {
             let views = ScalarBuffer::from(first_n_views);
@@ -738,7 +738,7 @@ impl GroupColumn for ByteGroupValueViewBuilder {
                         null_buffer,
                     ))
                 }
-                _ => unreachable!("View types should use `ArrowBytesViewMap`"),
+                _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
             }
         }
     }

From 15d8349a4d8ea990c323132bcc90dffddb8ada20 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sat, 12 Oct 2024 19:14:58 +0800
Subject: [PATCH 10/25] add e2e sql tests.

---
 .../sqllogictest/test_files/group_by.slt      | 62 +++++++++++++++++++
 1 file changed, 62 insertions(+)

diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt
index f561fa9e9ac8..72692290ab0e 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -5207,3 +5207,65 @@ NULL a 2
 
 statement ok
 drop table t;
+
+# test multi group by int + utf8view
+statement ok
+create table source as values
+-- use some strings that are larger than 12 characters as that goes through a different path
+(1, 'a'),
+(1, 'a'),
+(2, 'thisstringislongerthan12'),
+(2, 'thisstring'),
+(3, 'abc'),
+(3, 'cba'),
+(2, 'thisstring'),
+(null, null),
+(null, 'a'),
+(null, null),
+(null, 'a'),
+(2, 'thisstringisalsolongerthan12'),
+(2, 'thisstringislongerthan12'),
+(1, 'null')
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'Utf8View') as b from source;
+
+query ITI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 a 2
+1 null 1
+2 thisstring 2
+2 thisstringisalsolongerthan12 1
+2 thisstringislongerthan12 2
+3 abc 1
+3 cba 1
+NULL a 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+# test with binary view
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'BinaryView') as b from source;
+
+query I?I
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 61 2
+1 6e756c6c 1
+2 74686973737472696e67 2
+2 74686973737472696e676973616c736f6c6f6e6765727468616e3132 1
+2 74686973737472696e6769736c6f6e6765727468616e3132 2
+3 616263 1
+3 636261 1
+NULL 61 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;

From d9ee724773b19a2a159d4b02d65cd77b61f0a576 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sat, 12 Oct 2024 19:15:56 +0800
Subject: [PATCH 11/25] add unit tests.

---
 .../aggregates/group_values/group_column.rs   | 148 ++++++++++++++++++
 1 file changed, 148 insertions(+)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index bcd9d10da14f..edd8efc66d63 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -947,4 +947,152 @@ mod tests {
         assert!(!builder.equal_to(4, &input_array, 4));
         assert!(builder.equal_to(5, &input_array, 5));
     }
+
+    #[test]
+    fn test_byte_view_append_val() {
+        let mut builder =
+            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+        let builder_array = StringViewArray::from(vec![
+            Some("this string is quite long"), // in buffer 0
+            Some("foo"),
+            None,
+            Some("bar"),
+            Some("this string is also quite long"), // buffer 0
+            Some("this string is quite long"),      // buffer 1
+            Some("bar"),
+        ]);
+        let builder_array: ArrayRef = Arc::new(builder_array);
+        for row in 0..builder_array.len() {
+            builder.append_val(&builder_array, row);
+        }
+
+        let output = Box::new(builder).build();
+        // should be 2 output buffers to hold all the data
+        assert_eq!(output.as_string_view().data_buffers().len(), 2,);
+        assert_eq!(&output, &builder_array)
+    }
+
+    #[test]
+    fn test_byte_view_equal_to() {
+        // Will cover such cases:
+        //   - exist null, input not null
+        //   - exist null, input null; values not equal
+        //   - exist null, input null; values equal
+        //   - exist not null, input null
+        //   - exist not null, input not null; values not equal
+        //   - exist not null, input not null; values equal
+
+        let mut builder = ByteViewGroupValueBuilder::<StringViewType>::new();
+        let builder_array = Arc::new(StringViewArray::from(vec![
+            None,
+            None,
+            None,
+            Some("foo"),
+            Some("bar"),
+            Some("this string is quite long"),
+            Some("baz"),
+        ])) as ArrayRef;
+        builder.append_val(&builder_array, 0);
+        builder.append_val(&builder_array, 1);
+        builder.append_val(&builder_array, 2);
+        builder.append_val(&builder_array, 3);
+        builder.append_val(&builder_array, 4);
+        builder.append_val(&builder_array, 5);
+        builder.append_val(&builder_array, 6);
+
+        // Define input array
+        let (views, buffer, _nulls) = StringViewArray::from(vec![
+            Some("foo"),
+            Some("bar"),                       // set to null
+            Some("this string is quite long"), // set to null
+            None,
+            None,
+            Some("foo"),
+            Some("baz"),
+        ])
+        .into_parts();
+
+        // explicitly build a boolean buffer where one of the null values also happens to match
+        let mut boolean_buffer_builder = BooleanBufferBuilder::new(6);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(false); // this sets Some("bar") to null above
+        boolean_buffer_builder.append(false); // this sets Some("thisstringisquitelong") to null above
+        boolean_buffer_builder.append(false);
+        boolean_buffer_builder.append(false);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        let nulls = NullBuffer::new(boolean_buffer_builder.finish());
+        let input_array =
+            Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef;
+
+        // Check
+        assert!(!builder.equal_to(0, &input_array, 0));
+        assert!(builder.equal_to(1, &input_array, 1));
+        assert!(builder.equal_to(2, &input_array, 2));
+        assert!(!builder.equal_to(3, &input_array, 3));
+        assert!(!builder.equal_to(4, &input_array, 4));
+        assert!(!builder.equal_to(5, &input_array, 5));
+        assert!(builder.equal_to(6, &input_array, 6));
+    }
+
+    #[test]
+    fn test_byte_view_take_n() {
+        let mut builder =
+            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+        let input_array = StringViewArray::from(vec![
+            Some("this string is quite long"), // in buffer 0
+            Some("foo"),
+            Some("bar"),
+            Some("this string is also quite long"), // buffer 0
+            None,
+            Some("this string is quite long"), // buffer 1
+            None,
+            Some("another string that is is quite long"), // buffer 1
+            Some("bar"),
+        ]);
+        let input_array: ArrayRef = Arc::new(input_array);
+        for row in 0..input_array.len() {
+            builder.append_val(&input_array, row);
+        }
+
+        // should be 2 completed, one in progress buffer to hold all output
+        assert_eq!(builder.completed.len(), 2);
+        assert!(builder.in_progress.len() > 0);
+
+        let first_4 = builder.take_n(4);
+        println!(
+            "{}",
+            arrow::util::pretty::pretty_format_columns("first_4", &[first_4.clone()])
+                .unwrap()
+        );
+        assert_eq!(&first_4, &input_array.slice(0, 4));
+
+        // Add some new data after the first n
+        let input_array = StringViewArray::from(vec![
+            Some("short"),
+            None,
+            Some("Some new data to add that is long"), // in buffer 0
+            Some("short again"),
+        ]);
+        let input_array: ArrayRef = Arc::new(input_array);
+        for row in 0..input_array.len() {
+            builder.append_val(&input_array, row);
+        }
+
+        let result = Box::new(builder).build();
+        let expected: ArrayRef = Arc::new(StringViewArray::from(vec![
+            // last rows of the original input
+            None,
+            Some("this string is quite long"),
+            None,
+            Some("another string that is is quite long"),
+            Some("bar"),
+            // the subsequent input
+            Some("short"),
+            None,
+            Some("Some new data to add that is long"), // in buffer 0
+            Some("short again"),
+        ]));
+        assert_eq!(&result, &expected);
+    }
 }

From beffa35c4d825ffb4a29d4c8d03965cf3a73bd98 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sat, 12 Oct 2024 22:14:21 +0800
Subject: [PATCH 12/25] switch to a really elegant style codes from alamb.

---
 .../aggregates/group_values/group_column.rs   | 137 +++++++-----------
 1 file changed, 54 insertions(+), 83 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index edd8efc66d63..5977c91a200a 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -43,10 +43,13 @@ use datafusion_common::utils::proxy::VecAllocExt;
 use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
 use arrow_array::types::GenericStringType;
 use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
+use std::marker::PhantomData;
 use std::mem;
 use std::sync::Arc;
 use std::vec;
 
+const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
+
 /// Trait for storing a single column of group values in [`GroupValuesColumn`]
 ///
 /// Implementations of this trait store an in-progress collection of group values
@@ -397,9 +400,7 @@ where
 /// 1. Efficient comparison of incoming rows to existing rows
 /// 2. Efficient construction of the final output array
 /// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
-pub struct ByteGroupValueViewBuilder {
-    output_type: OutputType,
-
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
     /// The views of string values
     ///
     /// If string len <= 12, the view's format will be:
@@ -423,14 +424,37 @@ pub struct ByteGroupValueViewBuilder {
     /// `in_progress` will be flushed into `completed`, and create new `in_progress`
     /// when found its remaining capacity(`max_block_size` - `len(in_progress)`),
     /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
     max_block_size: usize,
 
     /// Nulls
     nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires <B>
+    _phantom: PhantomData<B>,
 }
 
-impl ByteGroupValueViewBuilder {
-    fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    #[cfg(test)]
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
     where
         B: ByteViewType,
     {
@@ -481,10 +505,7 @@ impl ByteGroupValueViewBuilder {
         }
     }
 
-    fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool
-    where
-        B: ByteViewType,
-    {
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
         let array = array.as_byte_view::<B>();
 
         // Check if nulls equal firstly
@@ -559,29 +580,13 @@ impl ByteGroupValueViewBuilder {
     }
 }
 
-impl GroupColumn for ByteGroupValueViewBuilder {
+impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
     fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
-        match self.output_type {
-            OutputType::Utf8View => {
-                self.equal_to_inner::<StringViewType>(lhs_row, array, rhs_row)
-            }
-            OutputType::BinaryView => {
-                self.equal_to_inner::<BinaryViewType>(lhs_row, array, rhs_row)
-            }
-            _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
-        }
+        self.equal_to_inner(lhs_row, array, rhs_row)
     }
 
     fn append_val(&mut self, array: &ArrayRef, row: usize) {
-        match self.output_type {
-            OutputType::Utf8View => {
-                self.append_val_inner::<StringViewType>(array, row);
-            }
-            OutputType::BinaryView => {
-                self.append_val_inner::<BinaryViewType>(array, row);
-            }
-            _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
-        }
+        self.append_val_inner(array, row)
     }
 
     fn len(&self) -> usize {
@@ -604,7 +609,6 @@ impl GroupColumn for ByteGroupValueViewBuilder {
 
     fn build(self: Box<Self>) -> ArrayRef {
         let Self {
-            output_type,
             views,
             in_progress,
             mut completed,
@@ -623,23 +627,12 @@ impl GroupColumn for ByteGroupValueViewBuilder {
         }
 
         let views = ScalarBuffer::from(views);
-        match output_type {
-            OutputType::Utf8View => {
-                Arc::new(GenericByteViewArray::<StringViewType>::new(
-                    views,
-                    completed,
-                    null_buffer,
-                ))
-            }
-            OutputType::BinaryView => {
-                Arc::new(GenericByteViewArray::<BinaryViewType>::new(
-                    views,
-                    completed,
-                    null_buffer,
-                ))
-            }
-            _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
-        }
+
+        Arc::new(GenericByteViewArray::<B>::new(
+            views,
+            completed,
+            null_buffer,
+        ))
     }
 
     fn take_n(&mut self, n: usize) -> ArrayRef {
@@ -704,42 +697,18 @@ impl GroupColumn for ByteGroupValueViewBuilder {
 
             // Build array and return
             let views = ScalarBuffer::from(first_n_views);
-            match self.output_type {
-                OutputType::Utf8View => {
-                    Arc::new(GenericByteViewArray::<StringViewType>::new(
-                        views,
-                        taken_buffers,
-                        null_buffer,
-                    ))
-                }
-                OutputType::BinaryView => {
-                    Arc::new(GenericByteViewArray::<BinaryViewType>::new(
-                        views,
-                        taken_buffers,
-                        null_buffer,
-                    ))
-                }
-                _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
-            }
+            Arc::new(GenericByteViewArray::<B>::new(
+                views,
+                taken_buffers,
+                null_buffer,
+            ))
         } else {
             let views = ScalarBuffer::from(first_n_views);
-            match self.output_type {
-                OutputType::Utf8View => {
-                    Arc::new(GenericByteViewArray::<StringViewType>::new(
-                        views,
-                        Vec::new(),
-                        null_buffer,
-                    ))
-                }
-                OutputType::BinaryView => {
-                    Arc::new(GenericByteViewArray::<BinaryViewType>::new(
-                        views,
-                        Vec::new(),
-                        null_buffer,
-                    ))
-                }
-                _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"),
-            }
+            Arc::new(GenericByteViewArray::<B>::new(
+                views,
+                Vec::new(),
+                null_buffer,
+            ))
         }
     }
 }
@@ -762,12 +731,14 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option<bool> {
 mod tests {
     use std::sync::Arc;
 
-    use arrow::datatypes::Int64Type;
-    use arrow_array::{ArrayRef, Int64Array, StringArray};
+    use arrow::{array::AsArray, datatypes::{Int64Type, StringViewType}};
+    use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray};
     use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
     use datafusion_physical_expr::binary_map::OutputType;
 
-    use crate::aggregates::group_values::group_column::PrimitiveGroupValueBuilder;
+    use crate::aggregates::group_values::group_column::{
+        ByteViewGroupValueBuilder, PrimitiveGroupValueBuilder,
+    };
 
     use super::{ByteGroupValueBuilder, GroupColumn};
 

From 46822f961f5f81b8ecbfcd1ed135a4be56c684e6 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 01:25:19 +0800
Subject: [PATCH 13/25] fix take_n.

---
 .../aggregates/group_values/group_column.rs   | 181 +++++++++++++-----
 1 file changed, 129 insertions(+), 52 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 5977c91a200a..78ceba49edd4 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -460,14 +460,14 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
     {
         let arr = array.as_byte_view::<B>();
 
-        // If a null row, set and return
+        // Null row case, set and return
         if arr.is_null(row) {
             self.nulls.append(true);
             self.views.push(0);
             return;
         }
 
-        // Not null case
+        // Not null row case
         self.nulls.append(false);
         let value: &[u8] = arr.value(row).as_ref();
 
@@ -578,43 +578,15 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
             &self.in_progress[offset..offset + length]
         }
     }
-}
-
-impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
-    fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
-        self.equal_to_inner(lhs_row, array, rhs_row)
-    }
-
-    fn append_val(&mut self, array: &ArrayRef, row: usize) {
-        self.append_val_inner(array, row)
-    }
-
-    fn len(&self) -> usize {
-        self.views.len()
-    }
-
-    fn size(&self) -> usize {
-        let buffers_size = self
-            .completed
-            .iter()
-            .map(|buf| buf.capacity() * std::mem::size_of::<u8>())
-            .sum::<usize>();
 
-        self.nulls.allocated_size()
-            + self.views.capacity() * std::mem::size_of::<u128>()
-            + self.in_progress.capacity() * std::mem::size_of::<u8>()
-            + buffers_size
-            + std::mem::size_of::<Self>()
-    }
-
-    fn build(self: Box<Self>) -> ArrayRef {
+    fn build_inner(self: Self) -> ArrayRef {
         let Self {
             views,
             in_progress,
             mut completed,
             nulls,
             ..
-        } = *self;
+        } = self;
 
         // Build nulls
         let null_buffer = nulls.build();
@@ -635,9 +607,15 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
         ))
     }
 
-    fn take_n(&mut self, n: usize) -> ArrayRef {
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
         debug_assert!(self.len() >= n);
 
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            return self.build_inner();
+        }
+
+        // The `n < len` case
         // Take n for nulls
         let null_buffer = self.nulls.take_n(n);
 
@@ -652,8 +630,8 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
         //     from last non-inlined `view`
         //
         //   - Take `0 ~ buffer index n-1` buffers, clone the `buffer index` buffer
-        //     (data part is wrapped by `Arc`, cheap to clone) if it is in `completed`,
-        //     or split
+        //     (data part is wrapped by `Arc`, cheap to clone) if it is one of `completed`,
+        //     or copy to generate a new buffer for return if it is `in_progress`
         //
         //   - Shift the `buffer index` of remaining non-inlined `views`
         //
@@ -667,30 +645,34 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
         if let Some(view) = last_non_inlined_view {
             let view = ByteView::from(*view);
             let last_related_buffer_index = view.buffer_index as usize;
-            let mut taken_buffers = Vec::with_capacity(last_related_buffer_index + 1);
 
-            // Take `0 ~ last_related_buffer_index - 1` buffers
-            if !self.completed.is_empty() {
-                taken_buffers.extend(self.completed.drain(0..last_related_buffer_index));
-            }
+            // Check should we take the whole `last_related_buffer_index` buffer
+            let take_whole_last_buffer = self.should_take_whole_buffer(
+                last_related_buffer_index,
+                view.offset + view.length,
+            );
 
-            // Process the `last_related_buffer_index` buffers
-            let last_buffer = if last_related_buffer_index < self.completed.len() {
-                // If it is in `completed`, simply clone
-                self.completed[last_related_buffer_index].clone()
+            // Take related buffers
+            let buffers = if take_whole_last_buffer {
+                self.take_buffers_with_whole_last(last_related_buffer_index)
             } else {
-                // If it is `in_progress`, copied `0 ~ offset` part
-                let taken_last_buffer =
-                    self.in_progress[0..view.offset as usize].to_vec();
-                Buffer::from_vec(taken_last_buffer)
+                self.take_buffers_with_partial_last(
+                    last_related_buffer_index,
+                    view.offset + view.length,
+                )
+            };
+
+            // Shift `buffer index`s finally
+            let shifts = if take_whole_last_buffer {
+                last_related_buffer_index + 1
+            } else {
+                last_non_inlined_view
             };
-            taken_buffers.push(last_buffer);
 
-            // Shift `buffer index` finally
             self.views.iter_mut().for_each(|view| {
                 if (*view as u32) > 12 {
                     let mut byte_view = ByteView::from(*view);
-                    byte_view.buffer_index -= last_related_buffer_index as u32;
+                    byte_view.buffer_index -= shifts as u32;
                     *view = byte_view.as_u128();
                 }
             });
@@ -711,6 +693,98 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
             ))
         }
     }
+
+    fn take_buffers_with_whole_last(
+        &mut self,
+        last_related_buffer_index: usize,
+    ) -> Vec<Buffer> {
+        if last_related_buffer_index == self.completed.len() {
+            self.flush_in_progress();
+        }
+        self.completed
+            .drain(0..last_related_buffer_index + 1)
+            .collect()
+    }
+
+    fn take_buffers_with_partial_last(
+        &mut self,
+        last_related_buffer_index: usize,
+        take_len: usize,
+    ) -> Vec<Buffer> {
+        let mut take_buffers = Vec::with_capacity(last_related_buffer_index + 1);
+
+        // Take `0 ~ last_related_buffer_index - 1` buffers
+        if !self.completed.is_empty() || last_related_buffer_index == 0 {
+            take_buffers.extend(self.completed.drain(0..last_related_buffer_index));
+        }
+
+        // Process the `last_related_buffer_index` buffers
+        let last_buffer = if last_related_buffer_index < self.completed.len() {
+            // If it is in `completed`, simply clone
+            self.completed[last_related_buffer_index].clone()
+        } else {
+            // If it is `in_progress`, copied `0 ~ offset` part
+            let taken_last_buffer = self.in_progress[0..take_len].to_vec();
+            Buffer::from_vec(taken_last_buffer)
+        };
+        take_buffers.push(last_buffer);
+
+        take_buffers
+    }
+
+    #[inline]
+    fn should_take_whole_buffer(&self, buffer_index: usize, take_len: usize) -> bool {
+        if buffer_index < self.completed.len() {
+            take_len == self.completed[buffer_index].len()
+        } else {
+            take_len == self.in_progress.len()
+        }
+    }
+
+    fn flush_in_progress(&mut self) {
+        let flushed_block = mem::replace(
+            &mut self.in_progress,
+            Vec::with_capacity(self.max_block_size),
+        );
+        let buffer = Buffer::from_vec(flushed_block);
+        self.completed.push(buffer);
+    }
+}
+
+impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
+    fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
+        self.equal_to_inner(lhs_row, array, rhs_row)
+    }
+
+    fn append_val(&mut self, array: &ArrayRef, row: usize) {
+        self.append_val_inner(array, row)
+    }
+
+    fn len(&self) -> usize {
+        self.views.len()
+    }
+
+    fn size(&self) -> usize {
+        let buffers_size = self
+            .completed
+            .iter()
+            .map(|buf| buf.capacity() * std::mem::size_of::<u8>())
+            .sum::<usize>();
+
+        self.nulls.allocated_size()
+            + self.views.capacity() * std::mem::size_of::<u128>()
+            + self.in_progress.capacity() * std::mem::size_of::<u8>()
+            + buffers_size
+            + std::mem::size_of::<Self>()
+    }
+
+    fn build(self: Box<Self>) -> ArrayRef {
+        Self::build_inner(*self)
+    }
+
+    fn take_n(&mut self, n: usize) -> ArrayRef {
+        self.take_n_inner(n)
+    }
 }
 
 /// Determines if the nullability of the existing and new input array can be used
@@ -731,7 +805,10 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option<bool> {
 mod tests {
     use std::sync::Arc;
 
-    use arrow::{array::AsArray, datatypes::{Int64Type, StringViewType}};
+    use arrow::{
+        array::AsArray,
+        datatypes::{Int64Type, StringViewType},
+    };
     use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray};
     use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
     use datafusion_physical_expr::binary_map::OutputType;

From 3a93584ce231c36cb8eb50e40e02111a77ab8f39 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 01:29:34 +0800
Subject: [PATCH 14/25] improve comments.

---
 .../src/aggregates/group_values/group_column.rs            | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 78ceba49edd4..c5be822243f9 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -629,9 +629,10 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
         //   - Get the last related `buffer index`(let's name it `buffer index n`)
         //     from last non-inlined `view`
         //
-        //   - Take `0 ~ buffer index n-1` buffers, clone the `buffer index` buffer
-        //     (data part is wrapped by `Arc`, cheap to clone) if it is one of `completed`,
-        //     or copy to generate a new buffer for return if it is `in_progress`
+        //   - Take buffers, the key is that we need to know if we need to take
+        //     the whole last related buffer. The logic is a bit complex, you can
+        //     detail in `take_buffers_with_whole_last`, `take_buffers_with_partial_last`
+        //     and other related steps in following
         //
         //   - Shift the `buffer index` of remaining non-inlined `views`
         //

From f99f55c6d075223fa5b363217890ff69e136225d Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 01:40:01 +0800
Subject: [PATCH 15/25] fix compile.

---
 .../src/aggregates/group_values/group_column.rs   | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index c5be822243f9..9e7d01c6591b 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -612,7 +612,8 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
 
         // The `n == len` case, we need to take all
         if self.len() == n {
-            return self.build_inner();
+            let cur_builder = std::mem::replace(self, Self::new());
+            return cur_builder.build_inner();
         }
 
         // The `n < len` case
@@ -650,7 +651,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
             // Check should we take the whole `last_related_buffer_index` buffer
             let take_whole_last_buffer = self.should_take_whole_buffer(
                 last_related_buffer_index,
-                view.offset + view.length,
+                (view.offset + view.length) as usize,
             );
 
             // Take related buffers
@@ -659,7 +660,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
             } else {
                 self.take_buffers_with_partial_last(
                     last_related_buffer_index,
-                    view.offset + view.length,
+                    (view.offset + view.length) as usize,
                 )
             };
 
@@ -667,7 +668,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
             let shifts = if take_whole_last_buffer {
                 last_related_buffer_index + 1
             } else {
-                last_non_inlined_view
+                last_related_buffer_index
             };
 
             self.views.iter_mut().for_each(|view| {
@@ -680,11 +681,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
 
             // Build array and return
             let views = ScalarBuffer::from(first_n_views);
-            Arc::new(GenericByteViewArray::<B>::new(
-                views,
-                taken_buffers,
-                null_buffer,
-            ))
+            Arc::new(GenericByteViewArray::<B>::new(views, buffers, null_buffer))
         } else {
             let views = ScalarBuffer::from(first_n_views);
             Arc::new(GenericByteViewArray::<B>::new(

From 37b48166c290c0bde1a59faa5a9113f4ae56a25c Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 12:09:54 +0800
Subject: [PATCH 16/25] fix clippy.

---
 .../src/aggregates/group_values/group_column.rs | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 9e7d01c6591b..7bd083d36c51 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -22,9 +22,6 @@ use arrow::array::GenericBinaryArray;
 use arrow::array::GenericStringArray;
 use arrow::array::OffsetSizeTrait;
 use arrow::array::PrimitiveArray;
-use arrow::array::PrimitiveBuilder;
-use arrow::array::StringBuilder;
-use arrow::array::StringViewBuilder;
 use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
 use arrow::buffer::OffsetBuffer;
 use arrow::buffer::ScalarBuffer;
@@ -33,10 +30,7 @@ use arrow::datatypes::ByteArrayType;
 use arrow::datatypes::ByteViewType;
 use arrow::datatypes::DataType;
 use arrow::datatypes::GenericBinaryType;
-use arrow::datatypes::StringViewType;
-use arrow_array::BinaryViewArray;
 use arrow_array::GenericByteViewArray;
-use arrow_array::StringViewArray;
 use arrow_buffer::Buffer;
 use datafusion_common::utils::proxy::VecAllocExt;
 
@@ -579,7 +573,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
         }
     }
 
-    fn build_inner(self: Self) -> ArrayRef {
+    fn build_inner(self) -> ArrayRef {
         let Self {
             views,
             in_progress,
@@ -1103,13 +1097,16 @@ mod tests {
 
         // should be 2 completed, one in progress buffer to hold all output
         assert_eq!(builder.completed.len(), 2);
-        assert!(builder.in_progress.len() > 0);
+        assert!(!builder.in_progress.is_empty());
 
         let first_4 = builder.take_n(4);
         println!(
             "{}",
-            arrow::util::pretty::pretty_format_columns("first_4", &[first_4.clone()])
-                .unwrap()
+            arrow::util::pretty::pretty_format_columns(
+                "first_4",
+                &[Arc::clone(&first_4)]
+            )
+            .unwrap()
         );
         assert_eq!(&first_4, &input_array.slice(0, 4));
 

From d78c68da4bf17218e8ff3f520357de22159e20fe Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 17:07:22 +0800
Subject: [PATCH 17/25] define more testcases in `test_byte_view_take_n`.

---
 .../aggregates/group_values/group_column.rs   | 120 +++++++++++++-----
 1 file changed, 86 insertions(+), 34 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 7bd083d36c51..d88c294b4135 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -1077,18 +1077,44 @@ mod tests {
 
     #[test]
     fn test_byte_view_take_n() {
+        // `take_n` is really complex, we should consider and test following situations:
+        //   1. Take nulls
+        //   2. Take all `inlined`s
+        //   3. Take non-inlined + partial last buffer in `completed`
+        //   4. Take non-inlined + whole last buffer in `completed`
+        //   5. Take non-inlined + partial last `in_progress`
+        //   6. Take non-inlined + while last buffer in ``in_progress`
+        //   7. Take all views at once
+
         let mut builder =
             ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
         let input_array = StringViewArray::from(vec![
-            Some("this string is quite long"), // in buffer 0
+            //  Test situation 1
+            None,
+            None,
+            // Test situation 2 (also test take null together)
+            None,
             Some("foo"),
             Some("bar"),
-            Some("this string is also quite long"), // buffer 0
+            // Test situation 3 (also test take null + inlined)
+            None,
+            Some("foo"),
+            Some("this string is quite long"),
+            Some("this string is also quite long"),
+            // Test situation 4 (also test take null + inlined)
+            None,
+            Some("bar"),
+            Some("this string is quite long"),
+            // Test situation 5 (also test take null + inlined)
             None,
-            Some("this string is quite long"), // buffer 1
+            Some("foo"),
+            Some("another string that is is quite long"),
+            Some("this string not so long"),
+            // Test situation 6 (also test take null + inlined + insert again after taking)
             None,
-            Some("another string that is is quite long"), // buffer 1
             Some("bar"),
+            Some("this string is quite long"),
+            // Finally, we insert the whole array again for testing situation 7
         ]);
         let input_array: ArrayRef = Arc::new(input_array);
         for row in 0..input_array.len() {
@@ -1097,45 +1123,71 @@ mod tests {
 
         // should be 2 completed, one in progress buffer to hold all output
         assert_eq!(builder.completed.len(), 2);
+        println!("{}", builder.in_progress.len());
         assert!(!builder.in_progress.is_empty());
 
-        let first_4 = builder.take_n(4);
+        // Take all `inlined`s
+        let taken_array = builder.take_n(2);
         println!(
             "{}",
             arrow::util::pretty::pretty_format_columns(
-                "first_4",
-                &[Arc::clone(&first_4)]
+                "taken_array",
+                &[Arc::clone(&taken_array)]
             )
             .unwrap()
         );
-        assert_eq!(&first_4, &input_array.slice(0, 4));
+        assert_eq!(&taken_array, &input_array.slice(0, 2));
 
-        // Add some new data after the first n
-        let input_array = StringViewArray::from(vec![
-            Some("short"),
-            None,
-            Some("Some new data to add that is long"), // in buffer 0
-            Some("short again"),
-        ]);
-        let input_array: ArrayRef = Arc::new(input_array);
-        for row in 0..input_array.len() {
-            builder.append_val(&input_array, row);
-        }
+        // Take non-inlined + partial buffer 0
+        let taken_array = builder.take_n(1);
+        println!(
+            "{}",
+            arrow::util::pretty::pretty_format_columns(
+                "taken_array",
+                &[Arc::clone(&taken_array)]
+            )
+            .unwrap()
+        );
+        assert_eq!(&taken_array, &input_array.slice(2, 1));
 
-        let result = Box::new(builder).build();
-        let expected: ArrayRef = Arc::new(StringViewArray::from(vec![
-            // last rows of the original input
-            None,
-            Some("this string is quite long"),
-            None,
-            Some("another string that is is quite long"),
-            Some("bar"),
-            // the subsequent input
-            Some("short"),
-            None,
-            Some("Some new data to add that is long"), // in buffer 0
-            Some("short again"),
-        ]));
-        assert_eq!(&result, &expected);
+        // Take non-inlined + remaining partial buffer 0
+        let taken_array = builder.take_n(1);
+        println!(
+            "{}",
+            arrow::util::pretty::pretty_format_columns(
+                "taken_array",
+                &[Arc::clone(&taken_array)]
+            )
+            .unwrap()
+        );
+        assert_eq!(&taken_array, &input_array.slice(3, 1));
+
+        // Add some new data after the first n
+        // let input_array = StringViewArray::from(vec![
+        //     Some("short"),
+        //     None,
+        //     Some("Some new data to add that is long"), // in buffer 0
+        //     Some("short again"),
+        // ]);
+        // let input_array: ArrayRef = Arc::new(input_array);
+        // for row in 0..input_array.len() {
+        //     builder.append_val(&input_array, row);
+        // }
+
+        // let result = Box::new(builder).build();
+        // let expected: ArrayRef = Arc::new(StringViewArray::from(vec![
+        //     // last rows of the original input
+        //     None,
+        //     Some("this string is quite long"),
+        //     None,
+        //     Some("another string that is is quite long"),
+        //     Some("bar"),
+        //     // the subsequent input
+        //     Some("short"),
+        //     None,
+        //     Some("Some new data to add that is long"), // in buffer 0
+        //     Some("short again"),
+        // ]));
+        // assert_eq!(&result, &expected);
     }
 }

From 7cb7dfca3b50b5df3350809a17ea46e780b90f96 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 18:43:30 +0800
Subject: [PATCH 18/25] connect up.

---
 .../src/aggregates/group_values/column.rs     |  18 ++-
 .../aggregates/group_values/group_column.rs   | 132 +++++++++---------
 2 files changed, 81 insertions(+), 69 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs
index 28f35b2bded2..4ad75844f7b7 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs
@@ -16,14 +16,16 @@
 // under the License.
 
 use crate::aggregates::group_values::group_column::{
-    ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
+    ByteGroupValueBuilder, ByteViewGroupValueBuilder, GroupColumn,
+    PrimitiveGroupValueBuilder,
 };
 use crate::aggregates::group_values::GroupValues;
 use ahash::RandomState;
 use arrow::compute::cast;
 use arrow::datatypes::{
-    Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
-    Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+    BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type,
+    Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type, UInt64Type,
+    UInt8Type,
 };
 use arrow::record_batch::RecordBatch;
 use arrow_array::{Array, ArrayRef};
@@ -119,6 +121,8 @@ impl GroupValuesColumn {
                 | DataType::LargeBinary
                 | DataType::Date32
                 | DataType::Date64
+                | DataType::Utf8View
+                | DataType::BinaryView
         )
     }
 }
@@ -184,6 +188,14 @@ impl GroupValues for GroupValuesColumn {
                         let b = ByteGroupValueBuilder::<i64>::new(OutputType::Binary);
                         v.push(Box::new(b) as _)
                     }
+                    &DataType::Utf8View => {
+                        let b = ByteViewGroupValueBuilder::<StringViewType>::new();
+                        v.push(Box::new(b) as _)
+                    }
+                    &DataType::BinaryView => {
+                        let b = ByteViewGroupValueBuilder::<BinaryViewType>::new();
+                        v.push(Box::new(b) as _)
+                    }
                     dt => {
                         return not_impl_err!("{dt} not supported in GroupValuesColumn")
                     }
diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index d88c294b4135..2bb47c873047 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -25,7 +25,6 @@ use arrow::array::PrimitiveArray;
 use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
 use arrow::buffer::OffsetBuffer;
 use arrow::buffer::ScalarBuffer;
-use arrow::datatypes::BinaryViewType;
 use arrow::datatypes::ByteArrayType;
 use arrow::datatypes::ByteViewType;
 use arrow::datatypes::DataType;
@@ -430,7 +429,7 @@ pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
 }
 
 impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
-    fn new() -> Self {
+    pub fn new() -> Self {
         Self {
             views: Vec::new(),
             in_progress: Vec::new(),
@@ -442,7 +441,6 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
     }
 
     /// Set the max block size
-    #[cfg(test)]
     fn with_max_block_size(mut self, max_block_size: usize) -> Self {
         self.max_block_size = max_block_size;
         self
@@ -606,7 +604,8 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
 
         // The `n == len` case, we need to take all
         if self.len() == n {
-            let cur_builder = std::mem::replace(self, Self::new());
+            let new_builder = Self::new().with_max_block_size(self.max_block_size);
+            let cur_builder = std::mem::replace(self, new_builder);
             return cur_builder.build_inner();
         }
 
@@ -1077,6 +1076,8 @@ mod tests {
 
     #[test]
     fn test_byte_view_take_n() {
+        // ####### Define cases and init #######
+
         // `take_n` is really complex, we should consider and test following situations:
         //   1. Take nulls
         //   2. Take all `inlined`s
@@ -1114,80 +1115,79 @@ mod tests {
             None,
             Some("bar"),
             Some("this string is quite long"),
-            // Finally, we insert the whole array again for testing situation 7
+            // Insert 4 and just take 3 to ensure it will go the path of situation 6
+            None,
+            // Finally, we create a new builder,  insert the whole array and then
+            // take whole at once for testing situation 7
         ]);
+
         let input_array: ArrayRef = Arc::new(input_array);
-        for row in 0..input_array.len() {
+        let first_ones_to_append = 16; // For testing situation 1~5
+        let second_ones_to_append = 3; // For testing situation 6
+        let final_ones_to_append = input_array.len(); // For testing situation 7
+
+        // ####### Test situation 1~5 #######
+        for row in 0..first_ones_to_append {
             builder.append_val(&input_array, row);
         }
 
-        // should be 2 completed, one in progress buffer to hold all output
         assert_eq!(builder.completed.len(), 2);
-        println!("{}", builder.in_progress.len());
-        assert!(!builder.in_progress.is_empty());
+        assert_eq!(builder.in_progress.len(), 59);
 
-        // Take all `inlined`s
+        // Situation 1
         let taken_array = builder.take_n(2);
-        println!(
-            "{}",
-            arrow::util::pretty::pretty_format_columns(
-                "taken_array",
-                &[Arc::clone(&taken_array)]
-            )
-            .unwrap()
-        );
         assert_eq!(&taken_array, &input_array.slice(0, 2));
 
-        // Take non-inlined + partial buffer 0
+        // Situation 2
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(2, 3));
+
+        // Situation 3
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(5, 3));
+
         let taken_array = builder.take_n(1);
-        println!(
-            "{}",
-            arrow::util::pretty::pretty_format_columns(
-                "taken_array",
-                &[Arc::clone(&taken_array)]
-            )
-            .unwrap()
-        );
-        assert_eq!(&taken_array, &input_array.slice(2, 1));
+        assert_eq!(&taken_array, &input_array.slice(8, 1));
+
+        // Situation 4
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(9, 3));
+
+        // Situation 5
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(12, 3));
 
-        // Take non-inlined + remaining partial buffer 0
         let taken_array = builder.take_n(1);
-        println!(
-            "{}",
-            arrow::util::pretty::pretty_format_columns(
-                "taken_array",
-                &[Arc::clone(&taken_array)]
-            )
-            .unwrap()
-        );
-        assert_eq!(&taken_array, &input_array.slice(3, 1));
-
-        // Add some new data after the first n
-        // let input_array = StringViewArray::from(vec![
-        //     Some("short"),
-        //     None,
-        //     Some("Some new data to add that is long"), // in buffer 0
-        //     Some("short again"),
-        // ]);
-        // let input_array: ArrayRef = Arc::new(input_array);
-        // for row in 0..input_array.len() {
-        //     builder.append_val(&input_array, row);
-        // }
-
-        // let result = Box::new(builder).build();
-        // let expected: ArrayRef = Arc::new(StringViewArray::from(vec![
-        //     // last rows of the original input
-        //     None,
-        //     Some("this string is quite long"),
-        //     None,
-        //     Some("another string that is is quite long"),
-        //     Some("bar"),
-        //     // the subsequent input
-        //     Some("short"),
-        //     None,
-        //     Some("Some new data to add that is long"), // in buffer 0
-        //     Some("short again"),
-        // ]));
-        // assert_eq!(&result, &expected);
+        assert_eq!(&taken_array, &input_array.slice(15, 1));
+
+        // ####### Test situation 6 #######
+        assert!(builder.completed.is_empty());
+        assert!(builder.in_progress.is_empty());
+        assert!(builder.views.is_empty());
+
+        for row in first_ones_to_append..first_ones_to_append + second_ones_to_append {
+            builder.append_val(&input_array, row);
+        }
+
+        assert!(builder.completed.is_empty());
+        assert_eq!(builder.in_progress.len(), 25);
+
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(16, 3));
+
+        // ####### Test situation 7 #######
+        // Create a new builder
+        let mut builder =
+            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+
+        for row in 0..final_ones_to_append {
+            builder.append_val(&input_array, row);
+        }
+
+        assert_eq!(builder.completed.len(), 3);
+        assert_eq!(builder.in_progress.len(), 25);
+
+        let taken_array = builder.take_n(final_ones_to_append);
+        assert_eq!(&taken_array, &input_array);
     }
 }

From e6c7e7e6f517a6cb262079f4ec5524201b3ab23b Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 18:55:57 +0800
Subject: [PATCH 19/25] fix doc.

---
 .../physical-plan/src/aggregates/group_values/group_column.rs   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 2bb47c873047..ae2e509d36b5 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -424,7 +424,7 @@ pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
     /// Nulls
     nulls: MaybeNullBufferBuilder,
 
-    /// phantom data so the type requires <B>
+    /// phantom data so the type requires `<B>`
     _phantom: PhantomData<B>,
 }
 

From 36d556e3fcdd47832cd408ede6a1798d6d180442 Mon Sep 17 00:00:00 2001
From: Andrew Lamb <andrew@nerdnetworks.org>
Date: Sun, 13 Oct 2024 07:55:26 -0400
Subject: [PATCH 20/25] Do not re-validate output is utf8

---
 .../src/aggregates/group_values/group_column.rs  | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index ae2e509d36b5..3206af3aa230 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -592,11 +592,17 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
 
         let views = ScalarBuffer::from(views);
 
-        Arc::new(GenericByteViewArray::<B>::new(
-            views,
-            completed,
-            null_buffer,
-        ))
+        // Safety:
+        // * all views were correctly made
+        // * (if utf8): Input was valid Utf8 so buffer contents are
+        // valid utf8 as well
+        unsafe {
+            Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                views,
+                completed,
+                null_buffer,
+            ))
+        }
     }
 
     fn take_n_inner(&mut self, n: usize) -> ArrayRef {

From 1fd926f1efbde6e8b543fac48af7b9989ae2345f Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 21:28:13 +0800
Subject: [PATCH 21/25] switch to unchecked when building array.

---
 .../aggregates/group_values/group_column.rs   | 32 +++++++++++++++----
 1 file changed, 25 insertions(+), 7 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 3206af3aa230..1b17f06269f5 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -680,14 +680,32 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
 
             // Build array and return
             let views = ScalarBuffer::from(first_n_views);
-            Arc::new(GenericByteViewArray::<B>::new(views, buffers, null_buffer))
+
+            // Safety:
+            // * all views were correctly made
+            // * (if utf8): Input was valid Utf8 so buffer contents are
+            // valid utf8 as well
+            unsafe {
+                Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                    views,
+                    buffers,
+                    null_buffer,
+                ))
+            }
         } else {
             let views = ScalarBuffer::from(first_n_views);
-            Arc::new(GenericByteViewArray::<B>::new(
-                views,
-                Vec::new(),
-                null_buffer,
-            ))
+
+            // Safety:
+            // * all views were correctly made
+            // * (if utf8): Input was valid Utf8 so buffer contents are
+            // valid utf8 as well
+            unsafe {
+                Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                    views,
+                    Vec::new(),
+                    null_buffer,
+                ))
+            }
         }
     }
 
@@ -1090,7 +1108,7 @@ mod tests {
         //   3. Take non-inlined + partial last buffer in `completed`
         //   4. Take non-inlined + whole last buffer in `completed`
         //   5. Take non-inlined + partial last `in_progress`
-        //   6. Take non-inlined + while last buffer in ``in_progress`
+        //   6. Take non-inlined + while last buffer in `in_progress`
         //   7. Take all views at once
 
         let mut builder =

From 34918cbfdea4671b953e3ea9c194fc172828349a Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 21:41:56 +0800
Subject: [PATCH 22/25] improve naming.

---
 .../aggregates/group_values/group_column.rs   | 40 +++++++++----------
 1 file changed, 20 insertions(+), 20 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 1b17f06269f5..d47997d1e617 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -645,29 +645,29 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
 
         if let Some(view) = last_non_inlined_view {
             let view = ByteView::from(*view);
-            let last_related_buffer_index = view.buffer_index as usize;
+            let last_remaining_buffer_index = view.buffer_index as usize;
 
-            // Check should we take the whole `last_related_buffer_index` buffer
+            // Check should we take the whole `last_remaining_buffer_index` buffer
             let take_whole_last_buffer = self.should_take_whole_buffer(
-                last_related_buffer_index,
+                last_remaining_buffer_index,
                 (view.offset + view.length) as usize,
             );
 
             // Take related buffers
             let buffers = if take_whole_last_buffer {
-                self.take_buffers_with_whole_last(last_related_buffer_index)
+                self.take_buffers_with_whole_last(last_remaining_buffer_index)
             } else {
                 self.take_buffers_with_partial_last(
-                    last_related_buffer_index,
+                    last_remaining_buffer_index,
                     (view.offset + view.length) as usize,
                 )
             };
 
             // Shift `buffer index`s finally
             let shifts = if take_whole_last_buffer {
-                last_related_buffer_index + 1
+                last_remaining_buffer_index + 1
             } else {
-                last_related_buffer_index
+                last_remaining_buffer_index
             };
 
             self.views.iter_mut().for_each(|view| {
@@ -711,35 +711,35 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
 
     fn take_buffers_with_whole_last(
         &mut self,
-        last_related_buffer_index: usize,
+        last_remaining_buffer_index: usize,
     ) -> Vec<Buffer> {
-        if last_related_buffer_index == self.completed.len() {
+        if last_remaining_buffer_index == self.completed.len() {
             self.flush_in_progress();
         }
         self.completed
-            .drain(0..last_related_buffer_index + 1)
+            .drain(0..last_remaining_buffer_index + 1)
             .collect()
     }
 
     fn take_buffers_with_partial_last(
         &mut self,
-        last_related_buffer_index: usize,
-        take_len: usize,
+        last_remaining_buffer_index: usize,
+        last_take_len: usize,
     ) -> Vec<Buffer> {
-        let mut take_buffers = Vec::with_capacity(last_related_buffer_index + 1);
+        let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index + 1);
 
-        // Take `0 ~ last_related_buffer_index - 1` buffers
-        if !self.completed.is_empty() || last_related_buffer_index == 0 {
-            take_buffers.extend(self.completed.drain(0..last_related_buffer_index));
+        // Take `0 ~ last_remaining_buffer_index - 1` buffers
+        if !self.completed.is_empty() || last_remaining_buffer_index == 0 {
+            take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index));
         }
 
-        // Process the `last_related_buffer_index` buffers
-        let last_buffer = if last_related_buffer_index < self.completed.len() {
+        // Process the `last_remaining_buffer_index` buffers
+        let last_buffer = if last_remaining_buffer_index < self.completed.len() {
             // If it is in `completed`, simply clone
-            self.completed[last_related_buffer_index].clone()
+            self.completed[last_remaining_buffer_index].clone()
         } else {
             // If it is `in_progress`, copied `0 ~ offset` part
-            let taken_last_buffer = self.in_progress[0..take_len].to_vec();
+            let taken_last_buffer = self.in_progress[0..last_take_len].to_vec();
             Buffer::from_vec(taken_last_buffer)
         };
         take_buffers.push(last_buffer);

From 8348024d096377d6ae78b7b9584a0103bd8b344f Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 21:47:26 +0800
Subject: [PATCH 23/25] use let else to make the codes clearer.

---
 .../aggregates/group_values/group_column.rs   | 102 +++++++++---------
 1 file changed, 52 insertions(+), 50 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index d47997d1e617..21076e019398 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -643,42 +643,8 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
             .rev()
             .find(|view| ((**view) as u32) > 12);
 
-        if let Some(view) = last_non_inlined_view {
-            let view = ByteView::from(*view);
-            let last_remaining_buffer_index = view.buffer_index as usize;
-
-            // Check should we take the whole `last_remaining_buffer_index` buffer
-            let take_whole_last_buffer = self.should_take_whole_buffer(
-                last_remaining_buffer_index,
-                (view.offset + view.length) as usize,
-            );
-
-            // Take related buffers
-            let buffers = if take_whole_last_buffer {
-                self.take_buffers_with_whole_last(last_remaining_buffer_index)
-            } else {
-                self.take_buffers_with_partial_last(
-                    last_remaining_buffer_index,
-                    (view.offset + view.length) as usize,
-                )
-            };
-
-            // Shift `buffer index`s finally
-            let shifts = if take_whole_last_buffer {
-                last_remaining_buffer_index + 1
-            } else {
-                last_remaining_buffer_index
-            };
-
-            self.views.iter_mut().for_each(|view| {
-                if (*view as u32) > 12 {
-                    let mut byte_view = ByteView::from(*view);
-                    byte_view.buffer_index -= shifts as u32;
-                    *view = byte_view.as_u128();
-                }
-            });
-
-            // Build array and return
+        // All taken views inlined
+        let Some(view) = last_non_inlined_view else {
             let views = ScalarBuffer::from(first_n_views);
 
             // Safety:
@@ -686,26 +652,62 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
             // * (if utf8): Input was valid Utf8 so buffer contents are
             // valid utf8 as well
             unsafe {
-                Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                return Arc::new(GenericByteViewArray::<B>::new_unchecked(
                     views,
-                    buffers,
+                    Vec::new(),
                     null_buffer,
-                ))
+                ));
             }
+        };
+
+        // Unfortunately, some taken views non-inlined
+        let view = ByteView::from(*view);
+        let last_remaining_buffer_index = view.buffer_index as usize;
+
+        // Check should we take the whole `last_remaining_buffer_index` buffer
+        let take_whole_last_buffer = self.should_take_whole_buffer(
+            last_remaining_buffer_index,
+            (view.offset + view.length) as usize,
+        );
+
+        // Take related buffers
+        let buffers = if take_whole_last_buffer {
+            self.take_buffers_with_whole_last(last_remaining_buffer_index)
         } else {
-            let views = ScalarBuffer::from(first_n_views);
+            self.take_buffers_with_partial_last(
+                last_remaining_buffer_index,
+                (view.offset + view.length) as usize,
+            )
+        };
 
-            // Safety:
-            // * all views were correctly made
-            // * (if utf8): Input was valid Utf8 so buffer contents are
-            // valid utf8 as well
-            unsafe {
-                Arc::new(GenericByteViewArray::<B>::new_unchecked(
-                    views,
-                    Vec::new(),
-                    null_buffer,
-                ))
+        // Shift `buffer index`s finally
+        let shifts = if take_whole_last_buffer {
+            last_remaining_buffer_index + 1
+        } else {
+            last_remaining_buffer_index
+        };
+
+        self.views.iter_mut().for_each(|view| {
+            if (*view as u32) > 12 {
+                let mut byte_view = ByteView::from(*view);
+                byte_view.buffer_index -= shifts as u32;
+                *view = byte_view.as_u128();
             }
+        });
+
+        // Build array and return
+        let views = ScalarBuffer::from(first_n_views);
+
+        // Safety:
+        // * all views were correctly made
+        // * (if utf8): Input was valid Utf8 so buffer contents are
+        // valid utf8 as well
+        unsafe {
+            Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                views,
+                buffers,
+                null_buffer,
+            ))
         }
     }
 

From 023ed64431f2799e0e645933a233caa2fd593866 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Sun, 13 Oct 2024 21:59:14 +0800
Subject: [PATCH 24/25] fix typo.

---
 .../physical-plan/src/aggregates/group_values/group_column.rs   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 21076e019398..87effb4aebb2 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -1110,7 +1110,7 @@ mod tests {
         //   3. Take non-inlined + partial last buffer in `completed`
         //   4. Take non-inlined + whole last buffer in `completed`
         //   5. Take non-inlined + partial last `in_progress`
-        //   6. Take non-inlined + while last buffer in `in_progress`
+        //   6. Take non-inlined + whole last buffer in `in_progress`
         //   7. Take all views at once
 
         let mut builder =

From c4d45c7bb41287c7a5c8b44e11aed4484c30be11 Mon Sep 17 00:00:00 2001
From: kamille <caoruiqiu.crq@antgroup.com>
Date: Mon, 14 Oct 2024 01:39:00 +0800
Subject: [PATCH 25/25] improve unit test coverage for
 `ByteViewGroupValueBuilder`.

---
 .../aggregates/group_values/group_column.rs   | 60 +++++++++++++++----
 1 file changed, 49 insertions(+), 11 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 87effb4aebb2..41534958602e 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -1044,18 +1044,39 @@ mod tests {
         //   - exist null, input null; values not equal
         //   - exist null, input null; values equal
         //   - exist not null, input null
-        //   - exist not null, input not null; values not equal
-        //   - exist not null, input not null; values equal
+        //   - exist not null, input not null; value lens not equal
+        //   - exist not null, input not null; value not equal(inlined case)
+        //   - exist not null, input not null; value equal(inlined case)
+        //
+        //   - exist not null, input not null; value not equal
+        //     (non-inlined case + prefix not equal)
+        //
+        //   - exist not null, input not null; value not equal
+        //     (non-inlined case + value in `completed`)
+        //
+        //   - exist not null, input not null; value equal
+        //     (non-inlined case + value in `completed`)
+        //
+        //   - exist not null, input not null; value not equal
+        //     (non-inlined case + value in `in_progress`)
+        //
+        //   - exist not null, input not null; value equal
+        //     (non-inlined case + value in `in_progress`)
 
-        let mut builder = ByteViewGroupValueBuilder::<StringViewType>::new();
+        // Set the block size to 40 for ensuring some unlined values are in `in_progress`,
+        // and some are in `completed`, so both two branches in `value` function can be covered.
+        let mut builder =
+            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
         let builder_array = Arc::new(StringViewArray::from(vec![
             None,
             None,
             None,
             Some("foo"),
+            Some("bazz"),
+            Some("foo"),
             Some("bar"),
-            Some("this string is quite long"),
-            Some("baz"),
+            Some("I am a long string for test eq in completed"),
+            Some("I am a long string for test eq in progress"),
         ])) as ArrayRef;
         builder.append_val(&builder_array, 0);
         builder.append_val(&builder_array, 1);
@@ -1064,28 +1085,40 @@ mod tests {
         builder.append_val(&builder_array, 4);
         builder.append_val(&builder_array, 5);
         builder.append_val(&builder_array, 6);
+        builder.append_val(&builder_array, 7);
+        builder.append_val(&builder_array, 8);
 
         // Define input array
         let (views, buffer, _nulls) = StringViewArray::from(vec![
             Some("foo"),
-            Some("bar"),                       // set to null
-            Some("this string is quite long"), // set to null
+            Some("bar"), // set to null
             None,
             None,
-            Some("foo"),
             Some("baz"),
+            Some("oof"),
+            Some("bar"),
+            Some("i am a long string for test eq in completed"),
+            Some("I am a long string for test eq in COMPLETED"),
+            Some("I am a long string for test eq in completed"),
+            Some("I am a long string for test eq in PROGRESS"),
+            Some("I am a long string for test eq in progress"),
         ])
         .into_parts();
 
         // explicitly build a boolean buffer where one of the null values also happens to match
-        let mut boolean_buffer_builder = BooleanBufferBuilder::new(6);
+        let mut boolean_buffer_builder = BooleanBufferBuilder::new(9);
         boolean_buffer_builder.append(true);
         boolean_buffer_builder.append(false); // this sets Some("bar") to null above
-        boolean_buffer_builder.append(false); // this sets Some("thisstringisquitelong") to null above
         boolean_buffer_builder.append(false);
         boolean_buffer_builder.append(false);
         boolean_buffer_builder.append(true);
         boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
         let nulls = NullBuffer::new(boolean_buffer_builder.finish());
         let input_array =
             Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef;
@@ -1098,6 +1131,11 @@ mod tests {
         assert!(!builder.equal_to(4, &input_array, 4));
         assert!(!builder.equal_to(5, &input_array, 5));
         assert!(builder.equal_to(6, &input_array, 6));
+        assert!(!builder.equal_to(7, &input_array, 7));
+        assert!(!builder.equal_to(7, &input_array, 8));
+        assert!(builder.equal_to(7, &input_array, 9));
+        assert!(!builder.equal_to(8, &input_array, 10));
+        assert!(builder.equal_to(8, &input_array, 11));
     }
 
     #[test]
@@ -1149,7 +1187,7 @@ mod tests {
 
         let input_array: ArrayRef = Arc::new(input_array);
         let first_ones_to_append = 16; // For testing situation 1~5
-        let second_ones_to_append = 3; // For testing situation 6
+        let second_ones_to_append = 4; // For testing situation 6
         let final_ones_to_append = input_array.len(); // For testing situation 7
 
         // ####### Test situation 1~5 #######