From 67a4766269c37ad6314e915d8f7f7d95ee4629f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=94=A1=E7=95=A5?= Date: Thu, 2 May 2024 15:01:14 +0800 Subject: [PATCH 01/10] feat: support gc for view_arrays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit part 1: implement checker to check if current buffer is compact Signed-off-by: 蔡略 --- arrow-array/src/array/byte_view_array.rs | 160 +++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 79f2d47587a6..c205c011265d 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -25,6 +25,7 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder, ByteView}; use arrow_schema::{ArrowError, DataType}; use std::any::Any; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; @@ -265,6 +266,51 @@ impl GenericByteViewArray { phantom: Default::default(), } } + + /// Returns whether this array is a compact view + pub(self) fn is_compact_view(&self) -> bool { + todo!() + } + + /// Returns a compact version of this array + /// + /// # Compaction + /// + /// before compaction: + /// ```text + /// ┌──────┐ + /// │......│ + /// │......│ + /// ┌────────────────────┐ ┌ ─ ─ ─ ▶ │Data1 │ Large buffer + /// │ View 1 │─ ─ ─ ─ │......│ with data that + /// ├────────────────────┤ │......│ is not referred + /// │ View 2 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data2 │ to by View 1 or + /// └────────────────────┘ │......│ View 2 + /// │......│ + /// 2 views, refer to │......│ + /// small portions of a └──────┘ + /// large buffer + /// ``` + /// + /// after compaction: + /// + /// ```text + /// ┌────────────────────┐ ┌─────┐ After gc, only + /// │ View 1 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data1│ data that is + /// ├────────────────────┤ ┌ ─ ─ ─ ▶ │Data2│ pointed to by + /// │ View 2 │─ ─ ─ ─ └─────┘ the views is + /// └────────────────────┘ left + /// + /// + /// 2 views + /// ``` + /// this method will compact the data buffers to only include the data + /// that is pointed to by the views + /// and return a new array with the compacted data buffers. + /// the original array will be left as is. + pub fn compact(&self) -> Self { + todo!() + } } impl Debug for GenericByteViewArray { @@ -482,6 +528,60 @@ impl From>> for StringViewArray { } } +/// A helper struct that used to check if the array is compact view +/// +/// # Note +/// +/// The checker is lazy and will not check the array until `finish` is called. +/// +/// This is based on the assumption that the array will most likely to be not compact, +/// so it is likely to scan the entire array. +/// +/// Then it is better to do the check at once, rather than doing it for each accumulate operation. +struct CompactChecker { + length: usize, + coverage: BTreeMap, +} + +impl CompactChecker { + pub fn new(length: usize) -> Self { + Self { + length, + coverage: BTreeMap::new(), + } + } + + /// Accumulate a new covered interval to the checker + pub fn accumulate(&mut self, offset: usize, length: usize) { + if length == 0 { + return; + } + let end = offset + length; + if let Some(val) = self.coverage.get_mut(&offset) { + if *val < end { + *val = end; + } + } else { + self.coverage.insert(offset, end); + } + } + + /// Check if the checker is fully covered + pub fn finish(&self) -> bool { + // check if the coverage is continuous and full + let mut last_end = 0; + // todo: can be optimized + for (start, end) in self.coverage.iter() { + if *start > last_end { + return false; + } + last_end = *end; + } + + last_end == self.length + } +} + #[cfg(test)] mod tests { use crate::builder::{BinaryViewBuilder, StringViewBuilder}; @@ -645,4 +745,64 @@ mod tests { StringViewArray::new(views, buffers, None); } + + #[test] + fn test_compact_checker() { + use super::CompactChecker; + // single coverage, full + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 10); + assert!(checker.finish()); + // single coverage, partial + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + assert!(!checker.finish()); + // multiple coverage, no overlapping, partial + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + checker.accumulate(5, 4); + assert!(!checker.finish()); + + //multiple coverage, no overlapping, full + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + checker.accumulate(5, 5); + assert!(checker.finish()); + //multiple coverage, overlapping, partial + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + checker.accumulate(4, 5); + assert!(!checker.finish()); + + //multiple coverage, overlapping, full + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + checker.accumulate(4, 6); + assert!(checker.finish()); + //mutiple coverage, no overlapping, full, out of order + let mut checker = CompactChecker::new(10); + checker.accumulate(4, 6); + checker.accumulate(0, 4); + assert!(checker.finish()); + + // multiple coverage, overlapping, full, out of order + let mut checker = CompactChecker::new(10); + checker.accumulate(4, 6); + checker.accumulate(0, 4); + assert!(checker.finish()); + + // multiple coverage, overlapping, full, containing null + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + checker.accumulate(5, 0); + checker.accumulate(5, 5); + assert!(checker.finish()); + // multiple coverage, overlapping, full, containing null + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + checker.accumulate(5, 0); + checker.accumulate(4, 6); + checker.accumulate(5, 5); + assert!(checker.finish()); + } } From 4c76bc551a84bd3c1e8c1f7537ddd81a6f6fc787 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=94=A1=E7=95=A5?= Date: Thu, 2 May 2024 16:26:12 +0800 Subject: [PATCH 02/10] feat: add gc to view-arrays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit part2: implement actual gc algorithm Signed-off-by: 蔡略 --- arrow-array/src/array/byte_view_array.rs | 146 +++++++++++++++++++++-- 1 file changed, 133 insertions(+), 13 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index c205c011265d..f52bce8ab0b7 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -25,7 +25,7 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder, ByteView}; use arrow_schema::{ArrowError, DataType}; use std::any::Any; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; @@ -267,16 +267,32 @@ impl GenericByteViewArray { } } - /// Returns whether this array is a compact view - pub(self) fn is_compact_view(&self) -> bool { - todo!() + /// Returns whether the buffers are compact + pub(self) fn compact_check(&self) -> Vec { + let mut checkers: Vec<_> = self + .buffers + .iter() + .map(|b| CompactChecker::new(b.len())) + .collect(); + + for (i, view) in self.views.iter().enumerate() { + let view = ByteView::from(*view); + if self.is_null(i) || view.length <= 12 { + continue; + } + checkers[view.buffer_index as usize] + .accumulate(view.offset as usize, view.length as usize); + } + checkers.into_iter().map(|c| c.finish()).collect() } - /// Returns a compact version of this array + /// Returns a buffer compact version of this array + /// + /// The original array will *not* be modified /// - /// # Compaction + /// # Garbage Collection /// - /// before compaction: + /// Before GC: /// ```text /// ┌──────┐ /// │......│ @@ -292,7 +308,7 @@ impl GenericByteViewArray { /// large buffer /// ``` /// - /// after compaction: + /// After GC: /// /// ```text /// ┌────────────────────┐ ┌─────┐ After gc, only @@ -304,12 +320,60 @@ impl GenericByteViewArray { /// /// 2 views /// ``` - /// this method will compact the data buffers to only include the data - /// that is pointed to by the views + /// This method will compact the data buffers to only include the data + /// that is pointed to by the views, /// and return a new array with the compacted data buffers. - /// the original array will be left as is. - pub fn compact(&self) -> Self { - todo!() + /// The original array will be left as is. + pub fn gc(&self) -> Self { + let compact_check = self.compact_check(); + + if compact_check.iter().all(|x| *x) { + return self.clone(); + } + + let mut new_views = Vec::with_capacity(self.views.len()); + let mut new_bufs: Vec> = vec![vec![]; self.buffers.len()]; + for view in self.views.iter() { + let mut bv = ByteView::from(*view); + let idx = bv.buffer_index as usize; + if bv.length <= 12 || compact_check[idx] { + new_views.push(*view); + continue; + } + // copy data to new buffer + let data = self.buffers.get(idx).unwrap(); + let offset = new_bufs[idx].len(); + let len = bv.length as usize; + new_bufs[idx].extend_from_slice( + data.get(bv.offset as usize..bv.offset as usize + len) + .unwrap(), + ); + // update view + bv.offset = offset as u32; + + new_views.push(bv.into()); + } + let new_bufs: Vec<_> = new_bufs.into_iter().map(Buffer::from_vec).collect(); + + let new_views = ScalarBuffer::from(new_views); + + let new_buffers = self + .buffers + .iter() + .enumerate() + .map(|(idx, buf)| { + if compact_check[idx] { + buf.clone() + } else { + new_bufs[idx].clone() + } + }) + .collect(); + + let mut compacted = self.clone(); + compacted.buffers = new_buffers; + compacted.views = new_views; + compacted } } @@ -805,4 +869,60 @@ mod tests { checker.accumulate(5, 5); assert!(checker.finish()); } + + #[test] + fn test_gc() { + // test compact on compacted data + let array = { + let mut builder = StringViewBuilder::new(); + builder.append_value("I look at you all"); + builder.append_option(Some("see the love there that's sleeping")); + builder.finish() + }; + + let compacted = array.gc(); + // verify it is a shallow copy + assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); + + // test compact on non-compacted data + let mut array = { + let mut builder = StringViewBuilder::new(); + builder.append_value("while my guitar gently weeps"); + builder.finish() + }; + + // shrink the view + let mut view = ByteView::from(array.views[0]); + view.length = 15; + let new_views = ScalarBuffer::from(vec![view.into()]); + array.views = new_views; + + let compacted = array.gc(); + // verify it is a deep copy + assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); + // verify content + assert_eq!(array.value(0), compacted.value(0)); + // verify compacted + assert!(compacted.compact_check().iter().all(|x| *x)); + + // test compact on array containing null + let mut array = { + let mut builder = StringViewBuilder::new(); + builder.append_null(); + builder.append_option(Some("I don't know why nobody told you")); + builder.finish() + }; + + let mut view = ByteView::from(array.views[1]); + view.length = 15; + let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]); + array.views = new_views; + + let compacted = array.gc(); + + assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); + assert_eq!(array.value(0), compacted.value(0)); + assert_eq!(array.value(1), compacted.value(1)); + assert!(compacted.compact_check().iter().all(|x| *x)); + } } From 2cb2e0d71172308e5f7a5c1cd74016b21dc2db44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=94=A1=E7=95=A5?= Date: Thu, 2 May 2024 16:32:49 +0800 Subject: [PATCH 03/10] patch: add more tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 蔡略 --- arrow-array/src/array/byte_view_array.rs | 48 ++++++++++++++++++------ 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index f52bce8ab0b7..a59b052c17e8 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -325,18 +325,18 @@ impl GenericByteViewArray { /// and return a new array with the compacted data buffers. /// The original array will be left as is. pub fn gc(&self) -> Self { - let compact_check = self.compact_check(); + let check_result = self.compact_check(); - if compact_check.iter().all(|x| *x) { + if check_result.iter().all(|x| *x) { return self.clone(); } let mut new_views = Vec::with_capacity(self.views.len()); let mut new_bufs: Vec> = vec![vec![]; self.buffers.len()]; - for view in self.views.iter() { + for (i, view) in self.views.iter().enumerate() { let mut bv = ByteView::from(*view); let idx = bv.buffer_index as usize; - if bv.length <= 12 || compact_check[idx] { + if self.is_null(i) || bv.length <= 12 || check_result[idx] { new_views.push(*view); continue; } @@ -362,7 +362,7 @@ impl GenericByteViewArray { .iter() .enumerate() .map(|(idx, buf)| { - if compact_check[idx] { + if check_result[idx] { buf.clone() } else { new_bufs[idx].clone() @@ -604,14 +604,15 @@ impl From>> for StringViewArray { /// Then it is better to do the check at once, rather than doing it for each accumulate operation. struct CompactChecker { length: usize, - coverage: BTreeMap, + intervals: BTreeMap, } impl CompactChecker { + /// Create a new checker with the expected length of the buffer pub fn new(length: usize) -> Self { Self { length, - coverage: BTreeMap::new(), + intervals: BTreeMap::new(), } } @@ -621,21 +622,21 @@ impl CompactChecker { return; } let end = offset + length; - if let Some(val) = self.coverage.get_mut(&offset) { + if let Some(val) = self.intervals.get_mut(&offset) { if *val < end { *val = end; } } else { - self.coverage.insert(offset, end); + self.intervals.insert(offset, end); } } /// Check if the checker is fully covered - pub fn finish(&self) -> bool { + pub fn finish(self) -> bool { // check if the coverage is continuous and full let mut last_end = 0; // todo: can be optimized - for (start, end) in self.coverage.iter() { + for (start, end) in self.intervals.iter() { if *start > last_end { return false; } @@ -924,5 +925,30 @@ mod tests { assert_eq!(array.value(0), compacted.value(0)); assert_eq!(array.value(1), compacted.value(1)); assert!(compacted.compact_check().iter().all(|x| *x)); + + // test compact on multiple buffers + let mut array = { + let mut builder = StringViewBuilder::new().with_block_size(15); + builder.append_value("how to unfold your love"); + builder.append_option(Some("I don't know how someone controlled you")); + builder.finish() + }; + + // verify it's not same buffer + assert_eq!(array.buffers.len(), 2); + // shrink the view + + let mut view = ByteView::from(array.views[1]); + view.length = 15; + let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]); + array.views = new_views; + + let compacted = array.gc(); + assert_eq!(compacted.buffers.len(), 2); + assert_eq!(array.value(0), compacted.value(0)); + assert_eq!(array.value(1), compacted.value(1)); + assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); + assert_ne!(array.buffers[1].as_ptr(), compacted.buffers[1].as_ptr()); + assert!(compacted.compact_check().iter().all(|x| *x)); } } From f5e964d1fcfee5c1c94c492dd345b6fee4ce7c9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=94=A1=E7=95=A5?= Date: Fri, 3 May 2024 08:54:28 +0800 Subject: [PATCH 04/10] patch: improve robustness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 蔡略 --- arrow-array/src/array/byte_view_array.rs | 50 ++++++++++++++++++++---- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index a59b052c17e8..f625a0694578 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -594,8 +594,6 @@ impl From>> for StringViewArray { /// A helper struct that used to check if the array is compact view /// -/// # Note -/// /// The checker is lazy and will not check the array until `finish` is called. /// /// This is based on the assumption that the array will most likely to be not compact, @@ -622,6 +620,12 @@ impl CompactChecker { return; } let end = offset + length; + if end > self.length { + panic!( + "Invalid interval: offset {} length {} is out of bound of length {}", + offset, length, self.length + ); + } if let Some(val) = self.intervals.get_mut(&offset) { if *val < end { *val = end; @@ -635,12 +639,14 @@ impl CompactChecker { pub fn finish(self) -> bool { // check if the coverage is continuous and full let mut last_end = 0; - // todo: can be optimized + for (start, end) in self.intervals.iter() { if *start > last_end { return false; } - last_end = *end; + if *end > last_end { + last_end = *end; + } } last_end == self.length @@ -812,16 +818,20 @@ mod tests { } #[test] + #[should_panic(expected = "Invalid interval: offset 0 length 13 is out of bound of length 12")] fn test_compact_checker() { use super::CompactChecker; + // single coverage, full let mut checker = CompactChecker::new(10); checker.accumulate(0, 10); assert!(checker.finish()); + // single coverage, partial let mut checker = CompactChecker::new(10); checker.accumulate(0, 5); assert!(!checker.finish()); + // multiple coverage, no overlapping, partial let mut checker = CompactChecker::new(10); checker.accumulate(0, 5); @@ -833,6 +843,7 @@ mod tests { checker.accumulate(0, 5); checker.accumulate(5, 5); assert!(checker.finish()); + //multiple coverage, overlapping, partial let mut checker = CompactChecker::new(10); checker.accumulate(0, 5); @@ -844,6 +855,7 @@ mod tests { checker.accumulate(0, 5); checker.accumulate(4, 6); assert!(checker.finish()); + //mutiple coverage, no overlapping, full, out of order let mut checker = CompactChecker::new(10); checker.accumulate(4, 6); @@ -862,6 +874,7 @@ mod tests { checker.accumulate(5, 0); checker.accumulate(5, 5); assert!(checker.finish()); + // multiple coverage, overlapping, full, containing null let mut checker = CompactChecker::new(10); checker.accumulate(0, 5); @@ -869,35 +882,54 @@ mod tests { checker.accumulate(4, 6); checker.accumulate(5, 5); assert!(checker.finish()); + + // multiple coverage, overlapping, full, containing null + // + // this case is for attacking those implementation that only check + // the lower-bound of the interval + let mut checker = CompactChecker::new(10); + checker.accumulate(0, 5); + checker.accumulate(5, 0); + checker.accumulate(1, 9); + checker.accumulate(2, 3); + checker.accumulate(3, 1); + checker.accumulate(9, 1); + assert!(checker.finish()); + + // panic case, out of bound + let mut checker = CompactChecker::new(12); + checker.accumulate(0, 13); + checker.finish(); } #[test] fn test_gc() { + // --------------------------------------------------------------------- // test compact on compacted data + let array = { let mut builder = StringViewBuilder::new(); builder.append_value("I look at you all"); builder.append_option(Some("see the love there that's sleeping")); builder.finish() }; - let compacted = array.gc(); // verify it is a shallow copy assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); + // --------------------------------------------------------------------- // test compact on non-compacted data + let mut array = { let mut builder = StringViewBuilder::new(); builder.append_value("while my guitar gently weeps"); builder.finish() }; - // shrink the view let mut view = ByteView::from(array.views[0]); view.length = 15; let new_views = ScalarBuffer::from(vec![view.into()]); array.views = new_views; - let compacted = array.gc(); // verify it is a deep copy assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); @@ -906,7 +938,9 @@ mod tests { // verify compacted assert!(compacted.compact_check().iter().all(|x| *x)); + // --------------------------------------------------------------------- // test compact on array containing null + let mut array = { let mut builder = StringViewBuilder::new(); builder.append_null(); @@ -926,7 +960,9 @@ mod tests { assert_eq!(array.value(1), compacted.value(1)); assert!(compacted.compact_check().iter().all(|x| *x)); + // --------------------------------------------------------------------- // test compact on multiple buffers + let mut array = { let mut builder = StringViewBuilder::new().with_block_size(15); builder.append_value("how to unfold your love"); From 92d6de546bd291223bcca2825cb35e9f7d4941d8 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 13 Jun 2024 17:15:20 -0400 Subject: [PATCH 05/10] simplify gc --- arrow-array/src/array/byte_view_array.rs | 290 +---------------------- 1 file changed, 13 insertions(+), 277 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index f625a0694578..740e3e7fe28c 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -25,7 +25,6 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder, ByteView}; use arrow_schema::{ArrowError, DataType}; use std::any::Any; -use std::collections::BTreeMap; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; @@ -267,25 +266,6 @@ impl GenericByteViewArray { } } - /// Returns whether the buffers are compact - pub(self) fn compact_check(&self) -> Vec { - let mut checkers: Vec<_> = self - .buffers - .iter() - .map(|b| CompactChecker::new(b.len())) - .collect(); - - for (i, view) in self.views.iter().enumerate() { - let view = ByteView::from(*view); - if self.is_null(i) || view.length <= 12 { - continue; - } - checkers[view.buffer_index as usize] - .accumulate(view.offset as usize, view.length as usize); - } - checkers.into_iter().map(|c| c.finish()).collect() - } - /// Returns a buffer compact version of this array /// /// The original array will *not* be modified @@ -320,60 +300,20 @@ impl GenericByteViewArray { /// /// 2 views /// ``` - /// This method will compact the data buffers to only include the data - /// that is pointed to by the views, - /// and return a new array with the compacted data buffers. - /// The original array will be left as is. + /// This method will compact the data buffers by recreating the view array and only include the data + /// that is pointed to by the views. + /// + /// Note that it will copy the array regardless of whether the original array is compact. + /// Use with caution as this can be an expensive operation, only use it when you are sure that the view + /// array is significantly smaller than when it is originally created, e.g., after filtering or slicing. pub fn gc(&self) -> Self { - let check_result = self.compact_check(); + let mut builder = GenericByteViewBuilder::::with_capacity(self.len()); - if check_result.iter().all(|x| *x) { - return self.clone(); + for v in self.iter() { + builder.append_option(v); } - let mut new_views = Vec::with_capacity(self.views.len()); - let mut new_bufs: Vec> = vec![vec![]; self.buffers.len()]; - for (i, view) in self.views.iter().enumerate() { - let mut bv = ByteView::from(*view); - let idx = bv.buffer_index as usize; - if self.is_null(i) || bv.length <= 12 || check_result[idx] { - new_views.push(*view); - continue; - } - // copy data to new buffer - let data = self.buffers.get(idx).unwrap(); - let offset = new_bufs[idx].len(); - let len = bv.length as usize; - new_bufs[idx].extend_from_slice( - data.get(bv.offset as usize..bv.offset as usize + len) - .unwrap(), - ); - // update view - bv.offset = offset as u32; - - new_views.push(bv.into()); - } - let new_bufs: Vec<_> = new_bufs.into_iter().map(Buffer::from_vec).collect(); - - let new_views = ScalarBuffer::from(new_views); - - let new_buffers = self - .buffers - .iter() - .enumerate() - .map(|(idx, buf)| { - if check_result[idx] { - buf.clone() - } else { - new_bufs[idx].clone() - } - }) - .collect(); - - let mut compacted = self.clone(); - compacted.buffers = new_buffers; - compacted.views = new_views; - compacted + builder.finish() } } @@ -592,67 +532,6 @@ impl From>> for StringViewArray { } } -/// A helper struct that used to check if the array is compact view -/// -/// The checker is lazy and will not check the array until `finish` is called. -/// -/// This is based on the assumption that the array will most likely to be not compact, -/// so it is likely to scan the entire array. -/// -/// Then it is better to do the check at once, rather than doing it for each accumulate operation. -struct CompactChecker { - length: usize, - intervals: BTreeMap, -} - -impl CompactChecker { - /// Create a new checker with the expected length of the buffer - pub fn new(length: usize) -> Self { - Self { - length, - intervals: BTreeMap::new(), - } - } - - /// Accumulate a new covered interval to the checker - pub fn accumulate(&mut self, offset: usize, length: usize) { - if length == 0 { - return; - } - let end = offset + length; - if end > self.length { - panic!( - "Invalid interval: offset {} length {} is out of bound of length {}", - offset, length, self.length - ); - } - if let Some(val) = self.intervals.get_mut(&offset) { - if *val < end { - *val = end; - } - } else { - self.intervals.insert(offset, end); - } - } - - /// Check if the checker is fully covered - pub fn finish(self) -> bool { - // check if the coverage is continuous and full - let mut last_end = 0; - - for (start, end) in self.intervals.iter() { - if *start > last_end { - return false; - } - if *end > last_end { - last_end = *end; - } - } - - last_end == self.length - } -} - #[cfg(test)] mod tests { use crate::builder::{BinaryViewBuilder, StringViewBuilder}; @@ -817,136 +696,21 @@ mod tests { StringViewArray::new(views, buffers, None); } - #[test] - #[should_panic(expected = "Invalid interval: offset 0 length 13 is out of bound of length 12")] - fn test_compact_checker() { - use super::CompactChecker; - - // single coverage, full - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 10); - assert!(checker.finish()); - - // single coverage, partial - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - assert!(!checker.finish()); - - // multiple coverage, no overlapping, partial - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - checker.accumulate(5, 4); - assert!(!checker.finish()); - - //multiple coverage, no overlapping, full - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - checker.accumulate(5, 5); - assert!(checker.finish()); - - //multiple coverage, overlapping, partial - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - checker.accumulate(4, 5); - assert!(!checker.finish()); - - //multiple coverage, overlapping, full - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - checker.accumulate(4, 6); - assert!(checker.finish()); - - //mutiple coverage, no overlapping, full, out of order - let mut checker = CompactChecker::new(10); - checker.accumulate(4, 6); - checker.accumulate(0, 4); - assert!(checker.finish()); - - // multiple coverage, overlapping, full, out of order - let mut checker = CompactChecker::new(10); - checker.accumulate(4, 6); - checker.accumulate(0, 4); - assert!(checker.finish()); - - // multiple coverage, overlapping, full, containing null - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - checker.accumulate(5, 0); - checker.accumulate(5, 5); - assert!(checker.finish()); - - // multiple coverage, overlapping, full, containing null - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - checker.accumulate(5, 0); - checker.accumulate(4, 6); - checker.accumulate(5, 5); - assert!(checker.finish()); - - // multiple coverage, overlapping, full, containing null - // - // this case is for attacking those implementation that only check - // the lower-bound of the interval - let mut checker = CompactChecker::new(10); - checker.accumulate(0, 5); - checker.accumulate(5, 0); - checker.accumulate(1, 9); - checker.accumulate(2, 3); - checker.accumulate(3, 1); - checker.accumulate(9, 1); - assert!(checker.finish()); - - // panic case, out of bound - let mut checker = CompactChecker::new(12); - checker.accumulate(0, 13); - checker.finish(); - } - #[test] fn test_gc() { - // --------------------------------------------------------------------- - // test compact on compacted data - - let array = { - let mut builder = StringViewBuilder::new(); - builder.append_value("I look at you all"); - builder.append_option(Some("see the love there that's sleeping")); - builder.finish() - }; - let compacted = array.gc(); - // verify it is a shallow copy - assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); - - // --------------------------------------------------------------------- - // test compact on non-compacted data - - let mut array = { - let mut builder = StringViewBuilder::new(); - builder.append_value("while my guitar gently weeps"); - builder.finish() - }; + let mut array = StringViewArray::from_iter_values(["while my guitar gently weeps"]); // shrink the view let mut view = ByteView::from(array.views[0]); view.length = 15; let new_views = ScalarBuffer::from(vec![view.into()]); array.views = new_views; let compacted = array.gc(); - // verify it is a deep copy assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); - // verify content assert_eq!(array.value(0), compacted.value(0)); - // verify compacted - assert!(compacted.compact_check().iter().all(|x| *x)); - // --------------------------------------------------------------------- // test compact on array containing null - - let mut array = { - let mut builder = StringViewBuilder::new(); - builder.append_null(); - builder.append_option(Some("I don't know why nobody told you")); - builder.finish() - }; + let mut array = + StringViewArray::from_iter([None, Some("I don't know why nobody told you")]); let mut view = ByteView::from(array.views[1]); view.length = 15; @@ -958,33 +722,5 @@ mod tests { assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); assert_eq!(array.value(0), compacted.value(0)); assert_eq!(array.value(1), compacted.value(1)); - assert!(compacted.compact_check().iter().all(|x| *x)); - - // --------------------------------------------------------------------- - // test compact on multiple buffers - - let mut array = { - let mut builder = StringViewBuilder::new().with_block_size(15); - builder.append_value("how to unfold your love"); - builder.append_option(Some("I don't know how someone controlled you")); - builder.finish() - }; - - // verify it's not same buffer - assert_eq!(array.buffers.len(), 2); - // shrink the view - - let mut view = ByteView::from(array.views[1]); - view.length = 15; - let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]); - array.views = new_views; - - let compacted = array.gc(); - assert_eq!(compacted.buffers.len(), 2); - assert_eq!(array.value(0), compacted.value(0)); - assert_eq!(array.value(1), compacted.value(1)); - assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); - assert_ne!(array.buffers[1].as_ptr(), compacted.buffers[1].as_ptr()); - assert!(compacted.compact_check().iter().all(|x| *x)); } } From 2cdfbcdbc8d806f51144700131bd73da9e66a65f Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 13 Jun 2024 17:25:35 -0400 Subject: [PATCH 06/10] make fmt happy --- arrow-array/src/array/byte_view_array.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 740e3e7fe28c..0f4c01efb369 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -302,10 +302,10 @@ impl GenericByteViewArray { /// ``` /// This method will compact the data buffers by recreating the view array and only include the data /// that is pointed to by the views. - /// + /// /// Note that it will copy the array regardless of whether the original array is compact. /// Use with caution as this can be an expensive operation, only use it when you are sure that the view - /// array is significantly smaller than when it is originally created, e.g., after filtering or slicing. + /// array is significantly smaller than when it is originally created, e.g., after filtering or slicing. pub fn gc(&self) -> Self { let mut builder = GenericByteViewBuilder::::with_capacity(self.len()); From c4f5fee9bdce826a502c7548d12307ea12a2cec6 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 13 Jun 2024 20:55:48 -0400 Subject: [PATCH 07/10] Update arrow-array/src/array/byte_view_array.rs Co-authored-by: Andrew Lamb --- arrow-array/src/array/byte_view_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 0f4c01efb369..9d8e0e3f1e5e 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -266,7 +266,7 @@ impl GenericByteViewArray { } } - /// Returns a buffer compact version of this array + /// Returns a "compacted" version of this array /// /// The original array will *not* be modified /// From a29182c98bbf1add86e9992c481df59b0c9ce3fd Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 13 Jun 2024 21:14:20 -0400 Subject: [PATCH 08/10] improve testing --- arrow-array/src/array/byte_view_array.rs | 52 +++++++++++++----------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 9d8e0e3f1e5e..5bf670e5acea 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -698,29 +698,33 @@ mod tests { #[test] fn test_gc() { - let mut array = StringViewArray::from_iter_values(["while my guitar gently weeps"]); - // shrink the view - let mut view = ByteView::from(array.views[0]); - view.length = 15; - let new_views = ScalarBuffer::from(vec![view.into()]); - array.views = new_views; - let compacted = array.gc(); - assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); - assert_eq!(array.value(0), compacted.value(0)); - - // test compact on array containing null - let mut array = - StringViewArray::from_iter([None, Some("I don't know why nobody told you")]); - - let mut view = ByteView::from(array.views[1]); - view.length = 15; - let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]); - array.views = new_views; - - let compacted = array.gc(); - - assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); - assert_eq!(array.value(0), compacted.value(0)); - assert_eq!(array.value(1), compacted.value(1)); + let test_data = [ + Some("short"), + Some("t"), + Some("longer than 12 bytes"), + None, + Some("short"), + ]; + + let array = { + let mut builder = StringViewBuilder::new().with_block_size(8); // create multiple buffers + test_data.into_iter().for_each(|v| builder.append_option(v)); + builder.finish() + }; + + fn check_gc(to_test: &StringViewArray) { + let gc = to_test.gc(); + + to_test.iter().zip(gc.iter()).for_each(|(a, b)| { + assert_eq!(a, b); + }); + assert_eq!(to_test.len(), gc.len()); + } + + check_gc(&array); + check_gc(&array.slice(1, 3)); + check_gc(&array.slice(2, 1)); + check_gc(&array.slice(2, 2)); + check_gc(&array.slice(3, 1)); } } From 1b3b5899816ff21d931b7090b635818f195bcb84 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 13 Jun 2024 21:25:20 -0400 Subject: [PATCH 09/10] check in a benchmark --- arrow-array/Cargo.toml | 4 +++ arrow-array/benches/gc_view_types.rs | 48 ++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 arrow-array/benches/gc_view_types.rs diff --git a/arrow-array/Cargo.toml b/arrow-array/Cargo.toml index b00d2c88e1a7..a8dbeded9ce7 100644 --- a/arrow-array/Cargo.toml +++ b/arrow-array/Cargo.toml @@ -62,3 +62,7 @@ criterion = { version = "0.5", default-features = false } [[bench]] name = "occupancy" harness = false + +[[bench]] +name = "gc_view_types" +harness = false diff --git a/arrow-array/benches/gc_view_types.rs b/arrow-array/benches/gc_view_types.rs new file mode 100644 index 000000000000..4b74a8f60b06 --- /dev/null +++ b/arrow-array/benches/gc_view_types.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::StringViewArray; +use criterion::*; + +fn gen_view_array(size: usize) -> StringViewArray { + StringViewArray::from_iter((0..size).map(|v| match v % 3 { + 0 => Some("small"), + 1 => Some("larger than 12 bytes array"), + 2 => None, + _ => unreachable!("unreachable"), + })) +} + +fn criterion_benchmark(c: &mut Criterion) { + let array = gen_view_array(100_000); + + c.bench_function("gc view types all", |b| { + b.iter(|| { + black_box(array.gc()); + }); + }); + + let sliced = array.slice(0, 100_000 / 2); + c.bench_function("gc view types slice half", |b| { + b.iter(|| { + black_box(sliced.gc()); + }); + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 98d9a61d009113537efad4e1c8ae863ee4413f6b Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 14 Jun 2024 09:07:20 -0400 Subject: [PATCH 10/10] actually have multiple buffers in test --- arrow-array/src/array/byte_view_array.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 5bf670e5acea..88033b83cbec 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -699,6 +699,7 @@ mod tests { #[test] fn test_gc() { let test_data = [ + Some("longer than 12 bytes"), Some("short"), Some("t"), Some("longer than 12 bytes"), @@ -711,9 +712,11 @@ mod tests { test_data.into_iter().for_each(|v| builder.append_option(v)); builder.finish() }; + assert!(array.buffers.len() > 1); fn check_gc(to_test: &StringViewArray) { let gc = to_test.gc(); + assert_ne!(to_test.data_buffers().len(), gc.data_buffers().len()); to_test.iter().zip(gc.iter()).for_each(|(a, b)| { assert_eq!(a, b);