Skip to content

Commit

Permalink
refactor: optimize take_by_slices_limit_from_blocks (#15978)
Browse files Browse the repository at this point in the history
* no copy

Signed-off-by: coldWater <[email protected]>

* quick fix

Signed-off-by: coldWater <[email protected]>

* fix

Signed-off-by: coldWater <[email protected]>

* push_repeat

Signed-off-by: coldWater <[email protected]>

* refine

Signed-off-by: coldWater <[email protected]>

* array nullable

Signed-off-by: coldWater <[email protected]>

* fix

Signed-off-by: coldWater <[email protected]>

* ut

Signed-off-by: coldWater <[email protected]>

* update

Signed-off-by: coldWater <[email protected]>

* fix

Signed-off-by: coldWater <[email protected]>

* refine

Signed-off-by: coldWater <[email protected]>

* update

Signed-off-by: coldWater <[email protected]>

* refine repeat

Signed-off-by: coldWater <[email protected]>

* update

Signed-off-by: coldWater <[email protected]>

* update

Signed-off-by: coldWater <[email protected]>

* fix

Signed-off-by: coldWater <[email protected]>

* repeat

Signed-off-by: coldWater <[email protected]>

---------

Signed-off-by: coldWater <[email protected]>
Co-authored-by: Jk Xu <[email protected]>
  • Loading branch information
forsaken628 and Dousir9 authored Jul 11, 2024
1 parent e3c3796 commit 0419f5c
Show file tree
Hide file tree
Showing 23 changed files with 408 additions and 148 deletions.
96 changes: 63 additions & 33 deletions src/query/expression/src/kernels/take_chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,19 @@ impl DataBlock {
slice: (usize, usize),
limit: Option<usize>,
) -> Self {
let columns = block
.columns()
.iter()
.map(|entry| {
Self::take_column_by_slices_limit(&[entry.clone()], &[(0, slice.0, slice.1)], limit)
})
.collect::<Vec<_>>();
let (start, len) = slice;
let num_rows = limit.unwrap_or(usize::MAX).min(len);
let result_size = num_rows.min(block.num_rows());

let mut builders = Self::builders(block, num_rows);

let num_rows = block.num_rows().min(slice.1.min(limit.unwrap_or(slice.1)));
DataBlock::new(columns, num_rows)
let len = len.min(num_rows);
let block_columns = block.columns();
for (col_index, builder) in builders.iter_mut().enumerate() {
Self::push_to_builder(builder, start, len, &block_columns[col_index].value);
}

Self::build_block(builders, result_size)
}

pub fn take_by_slices_limit_from_blocks(
Expand All @@ -167,24 +170,44 @@ impl DataBlock {
limit: Option<usize>,
) -> Self {
debug_assert!(!blocks.is_empty());
let total_num_rows: usize = blocks.iter().map(|c| c.num_rows()).sum();
let result_size: usize = slices.iter().map(|(_, _, c)| *c).sum();
let result_size = total_num_rows.min(result_size.min(limit.unwrap_or(result_size)));

let mut result_columns = Vec::with_capacity(blocks[0].num_columns());
let num_rows = limit
.unwrap_or(usize::MAX)
.min(slices.iter().map(|(_, _, c)| *c).sum());
let result_size = num_rows.min(blocks.iter().map(|c| c.num_rows()).sum());

for index in 0..blocks[0].num_columns() {
let cols = blocks
.iter()
.map(|c| c.get_by_offset(index).clone())
.collect::<Vec<_>>();
let mut builders = Self::builders(&blocks[0], num_rows);

let merged_col = Self::take_column_by_slices_limit(&cols, slices, limit);
let mut remain = num_rows;
for (block_index, start, len) in slices {
let block_columns = blocks[*block_index].columns();
let len = (*len).min(remain);
remain -= len;

result_columns.push(merged_col);
for (col_index, builder) in builders.iter_mut().enumerate() {
Self::push_to_builder(builder, *start, len, &block_columns[col_index].value);
}
if remain == 0 {
break;
}
}

DataBlock::new(result_columns, result_size)
Self::build_block(builders, result_size)
}

fn builders(block: &DataBlock, num_rows: usize) -> Vec<ColumnBuilder> {
block
.columns()
.iter()
.map(|col| ColumnBuilder::with_capacity(&col.data_type, num_rows))
.collect::<Vec<_>>()
}

fn build_block(builders: Vec<ColumnBuilder>, num_rows: usize) -> DataBlock {
let result_columns = builders
.into_iter()
.map(|b| BlockEntry::new(b.data_type(), Value::Column(b.build())))
.collect::<Vec<_>>();
DataBlock::new(result_columns, num_rows)
}

pub fn take_column_by_slices_limit(
Expand All @@ -205,17 +228,7 @@ impl DataBlock {
let len = (*len).min(remain);
remain -= len;

let col = &columns[*index];
match &col.value {
Value::Scalar(scalar) => {
let other = ColumnBuilder::repeat(&scalar.as_ref(), len, &col.data_type);
builder.append_column(&other.build());
}
Value::Column(c) => {
let c = c.slice(*start..(*start + len));
builder.append_column(&c);
}
}
Self::push_to_builder(&mut builder, *start, len, &columns[*index].value);
if remain == 0 {
break;
}
Expand All @@ -225,6 +238,23 @@ impl DataBlock {

BlockEntry::new(ty.clone(), Value::Column(col))
}

fn push_to_builder(
builder: &mut ColumnBuilder,
start: usize,
len: usize,
value: &Value<AnyType>,
) {
match value {
Value::Scalar(scalar) => {
builder.push_repeat(&scalar.as_ref(), len);
}
Value::Column(c) => {
let c = c.slice(start..(start + len));
builder.append_column(&c);
}
}
}
}

impl Column {
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ pub trait ValueType: Debug + Clone + PartialEq + Sized + 'static {

fn builder_len(builder: &Self::ColumnBuilder) -> usize;
fn push_item(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>);
fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize);
fn push_default(builder: &mut Self::ColumnBuilder);
fn append_column(builder: &mut Self::ColumnBuilder, other: &Self::Column);
fn build_column(builder: Self::ColumnBuilder) -> Self::Column;
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl ValueType for AnyType {
builder.push(item);
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.push_repeat(&item, n);
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.push_default();
}
Expand Down
18 changes: 18 additions & 0 deletions src/query/expression/src/types/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ impl<T: ValueType> ValueType for ArrayType<T> {
builder.push(item);
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.push_repeat(&item, n);
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.push_default();
}
Expand Down Expand Up @@ -362,6 +366,20 @@ impl<T: ValueType> ArrayColumnBuilder<T> {
self.offsets.push(len as u64);
}

pub fn push_repeat(&mut self, item: &T::Column, n: usize) {
if n == 0 {
return;
}
let before = T::builder_len(&self.builder);
T::append_column(&mut self.builder, item);
let len = T::builder_len(&self.builder) - before;
for _ in 1..n {
T::append_column(&mut self.builder, item);
}
self.offsets
.extend((1..=n).map(|i| (before + len * i) as u64));
}

pub fn push_default(&mut self) {
let len = T::builder_len(&self.builder);
self.offsets.push(len as u64);
Expand Down
27 changes: 23 additions & 4 deletions src/query/expression/src/types/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl ValueType for BinaryType {
builder.commit_row();
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.push_repeat(item, n);
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.commit_row();
}
Expand Down Expand Up @@ -332,10 +336,7 @@ impl BinaryColumnBuilder {

pub fn repeat(scalar: &[u8], n: usize) -> Self {
let len = scalar.len();
let mut data = Vec::with_capacity(len * n);
for _ in 0..n {
data.extend_from_slice(scalar);
}
let data = scalar.repeat(n);
let offsets = once(0)
.chain((0..n).map(|i| (len * (i + 1)) as u64))
.collect();
Expand Down Expand Up @@ -456,6 +457,24 @@ impl BinaryColumnBuilder {
self.data.get_unchecked(start..end)
}

pub fn push_repeat(&mut self, item: &[u8], n: usize) {
self.data.reserve(item.len() * n);
if self.need_estimated && self.offsets.len() - 1 < 64 {
for _ in 0..n {
self.data.extend_from_slice(item);
self.commit_row();
}
} else {
let start = self.data.len();
let len = item.len();
for _ in 0..n {
self.data.extend_from_slice(item)
}
self.offsets
.extend((1..=n).map(|i| (start + len * i) as u64));
}
}

pub fn pop(&mut self) -> Option<Vec<u8>> {
if self.len() > 0 {
let index = self.len() - 1;
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ impl ValueType for BitmapType {
builder.commit_row();
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.push_repeat(item, n);
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.commit_row();
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/expression/src/types/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ impl ValueType for BooleanType {
builder.push(item);
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
if n == 1 {
builder.push(item)
} else {
builder.extend_constant(n, item)
}
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.push(false);
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ impl ValueType for DateType {
builder.push(item);
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.resize(builder.len() + n, item);
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.push(Self::Scalar::default());
}
Expand Down
18 changes: 17 additions & 1 deletion src/query/expression/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ impl<Num: Decimal> ValueType for DecimalType<Num> {
builder.push(item)
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
if n == 1 {
builder.push(item)
} else {
builder.resize(builder.len() + n, item)
}
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.push(Num::default())
}
Expand Down Expand Up @@ -1068,13 +1076,21 @@ impl DecimalColumnBuilder {
}

pub fn push(&mut self, item: DecimalScalar) {
self.push_repeat(item, 1)
}

pub fn push_repeat(&mut self, item: DecimalScalar, n: usize) {
crate::with_decimal_type!(|DECIMAL_TYPE| match (self, item) {
(
DecimalColumnBuilder::DECIMAL_TYPE(builder, builder_size),
DecimalScalar::DECIMAL_TYPE(value, value_size),
) => {
debug_assert_eq!(*builder_size, value_size);
builder.push(value)
if n == 1 {
builder.push(value)
} else {
builder.resize(builder.len() + n, value)
}
}
(builder, scalar) => unreachable!("unable to push {scalar:?} to {builder:?}"),
})
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/empty_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl ValueType for EmptyArrayType {
*len += 1
}

fn push_item_repeat(len: &mut Self::ColumnBuilder, _: Self::ScalarRef<'_>, n: usize) {
*len += n
}

fn push_default(len: &mut Self::ColumnBuilder) {
*len += 1
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/empty_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl ValueType for EmptyMapType {
*len += 1
}

fn push_item_repeat(len: &mut Self::ColumnBuilder, _: Self::ScalarRef<'_>, n: usize) {
*len += n
}

fn push_default(len: &mut Self::ColumnBuilder) {
*len += 1
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl<const INDEX: usize> ValueType for GenericType<INDEX> {
builder.push(item);
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.push_repeat(&item, n)
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.push_default();
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/geometry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl ValueType for GeometryType {
builder.commit_row();
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.push_repeat(item, n)
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.commit_row();
}
Expand Down
13 changes: 13 additions & 0 deletions src/query/expression/src/types/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl<K: ValueType, V: ValueType> ValueType for KvPair<K, V> {
builder.push(item);
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
builder.push_repeat(item, n)
}

fn push_default(builder: &mut Self::ColumnBuilder) {
builder.push_default();
}
Expand Down Expand Up @@ -256,6 +260,11 @@ impl<K: ValueType, V: ValueType> KvColumnBuilder<K, V> {
V::push_item(&mut self.values, v);
}

pub fn push_repeat(&mut self, (k, v): (K::ScalarRef<'_>, V::ScalarRef<'_>), n: usize) {
K::push_item_repeat(&mut self.keys, k, n);
V::push_item_repeat(&mut self.values, v, n);
}

pub fn push_default(&mut self) {
K::push_default(&mut self.keys);
V::push_default(&mut self.values);
Expand Down Expand Up @@ -416,6 +425,10 @@ impl<K: ValueType, V: ValueType> ValueType for MapType<K, V> {
<MapInternal<K, V> as ValueType>::push_item(builder, item)
}

fn push_item_repeat(builder: &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>, n: usize) {
<MapInternal<K, V> as ValueType>::push_item_repeat(builder, item, n)
}

fn push_default(builder: &mut Self::ColumnBuilder) {
<MapInternal<K, V> as ValueType>::push_default(builder)
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/types/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl ValueType for NullType {
*len += 1
}

fn push_item_repeat(len: &mut Self::ColumnBuilder, _: Self::ScalarRef<'_>, n: usize) {
*len += n
}

fn push_default(len: &mut Self::ColumnBuilder) {
*len += 1
}
Expand Down
Loading

0 comments on commit 0419f5c

Please sign in to comment.