diff --git a/query/src/common/hashtable/hash_table.rs b/query/src/common/hashtable/hash_table.rs index c26485276f9d7..62bf27cdf1f5a 100644 --- a/query/src/common/hashtable/hash_table.rs +++ b/query/src/common/hashtable/hash_table.rs @@ -18,12 +18,12 @@ use std::alloc::Layout; use std::marker::PhantomData; use std::mem; -use crate::common::hashtable::hash_table_grower::Grower; +use crate::common::hashtable::hash_table_grower::HashTableGrower; use crate::common::HashTableEntity; -use crate::common::HashTableIter; +use crate::common::HashTableIteratorKind; use crate::common::HashTableKeyable; -pub struct HashTable> { +pub struct HashTable, Grower: HashTableGrower> { size: usize, grower: Grower, entities: *mut Entity, @@ -35,7 +35,9 @@ pub struct HashTable> { generics_hold: PhantomData, } -impl> Drop for HashTable { +impl, Grower: HashTableGrower> Drop + for HashTable +{ fn drop(&mut self) { unsafe { let size = (self.grower.max_size() as usize) * mem::size_of::(); @@ -53,8 +55,10 @@ impl> Drop for HashTable> HashTable { - pub fn create() -> HashTable { +impl, Grower: HashTableGrower> + HashTable +{ + pub fn create() -> HashTable { let size = (1 << 8) * mem::size_of::(); unsafe { let layout = Layout::from_size_align_unchecked(size, mem::align_of::()); @@ -83,8 +87,12 @@ impl> HashTable } #[inline(always)] - pub fn iter(&self) -> HashTableIter { - HashTableIter::create(self.grower.max_size(), self.entities, self.zero_entity) + pub fn iter(&self) -> HashTableIteratorKind { + HashTableIteratorKind::create_hash_table_iter( + self.grower.max_size(), + self.entities, + self.zero_entity, + ) } #[inline(always)] @@ -96,6 +104,14 @@ impl> HashTable } } + #[inline(always)] + pub fn insert_hash_key(&mut self, key: &Key, hash: u64, inserted: &mut bool) -> *mut Entity { + match self.insert_if_zero_key(key, hash, inserted) { + None => self.insert_non_zero_key(key, hash, inserted), + Some(zero_hash_table_entity) => zero_hash_table_entity, + } + } + #[inline(always)] pub fn find_key(&self, key: &Key) -> Option<*mut Entity> { if !key.is_zero() { @@ -119,6 +135,7 @@ impl> HashTable let grower = &self.grower; let mut place_value = grower.place(hash_value); + while !self.entities.offset(place_value).is_zero() && !self .entities diff --git a/query/src/common/hashtable/hash_table_entity.rs b/query/src/common/hashtable/hash_table_entity.rs index 7109804abd703..479731c716bf1 100644 --- a/query/src/common/hashtable/hash_table_entity.rs +++ b/query/src/common/hashtable/hash_table_entity.rs @@ -58,6 +58,7 @@ where Key: HashTableKeyable, Value: Sized + Copy, { + #[inline(always)] unsafe fn is_zero(self: *mut Self) -> bool { (*self).key.is_zero() } diff --git a/query/src/common/hashtable/hash_table_grower.rs b/query/src/common/hashtable/hash_table_grower.rs index c4e68f97989da..7283e4a5cbd4e 100644 --- a/query/src/common/hashtable/hash_table_grower.rs +++ b/query/src/common/hashtable/hash_table_grower.rs @@ -12,45 +12,96 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub trait HashTableGrower: Default + Clone { + fn max_size(&self) -> isize; + fn overflow(&self, size: usize) -> bool; + fn place(&self, hash_value: u64) -> isize; + fn next_place(&self, old_place: isize) -> isize; + fn increase_size(&mut self); +} + #[derive(Clone)] -pub struct Grower { +pub struct SingleLevelGrower { size_degree: u8, max_size: isize, } -impl Default for Grower { +impl Default for SingleLevelGrower { fn default() -> Self { - Grower { + SingleLevelGrower { size_degree: 8, max_size: 1_isize << 8, } } } -impl Grower { +impl HashTableGrower for SingleLevelGrower { #[inline(always)] - pub fn max_size(&self) -> isize { + fn max_size(&self) -> isize { self.max_size } #[inline(always)] - pub fn overflow(&self, size: usize) -> bool { + fn overflow(&self, size: usize) -> bool { size > ((1_usize) << (self.size_degree - 1)) } #[inline(always)] - pub fn place(&self, hash_value: u64) -> isize { + fn place(&self, hash_value: u64) -> isize { hash_value as isize & (self.max_size() - 1) } #[inline(always)] - pub fn next_place(&self, old_place: isize) -> isize { + fn next_place(&self, old_place: isize) -> isize { (old_place + 1) & (self.max_size() - 1) } #[inline(always)] - pub fn increase_size(&mut self) { + fn increase_size(&mut self) { self.size_degree += if self.size_degree >= 23 { 1 } else { 2 }; self.max_size = 1_isize << self.size_degree; } } + +#[derive(Clone)] +pub struct TwoLevelGrower { + size_degree: u8, + max_size: isize, +} + +impl Default for TwoLevelGrower { + fn default() -> Self { + TwoLevelGrower { + size_degree: 8, + max_size: 1_isize << 8, + } + } +} + +impl HashTableGrower for TwoLevelGrower { + #[inline(always)] + fn max_size(&self) -> isize { + self.max_size + } + + #[inline(always)] + fn overflow(&self, size: usize) -> bool { + size > ((1_usize) << (self.size_degree - 1)) + } + + #[inline(always)] + fn place(&self, hash_value: u64) -> isize { + hash_value as isize & (self.max_size() - 1) + } + + #[inline(always)] + fn next_place(&self, old_place: isize) -> isize { + (old_place + 1) & (self.max_size() - 1) + } + + #[inline(always)] + fn increase_size(&mut self) { + self.size_degree += if self.size_degree >= 15 { 1 } else { 2 }; + self.max_size = 1_isize << self.size_degree; + } +} diff --git a/query/src/common/hashtable/hash_table_iter.rs b/query/src/common/hashtable/hash_table_iter.rs index 65027a4303be0..bc7289bb9c17b 100644 --- a/query/src/common/hashtable/hash_table_iter.rs +++ b/query/src/common/hashtable/hash_table_iter.rs @@ -16,6 +16,41 @@ use std::marker::PhantomData; use crate::common::HashTableEntity; +pub enum HashTableIteratorKind> { + HashMapIterator(HashTableIter), + TwoLevelHashMapIter(TwoLevelHashTableIter), +} + +impl> HashTableIteratorKind { + pub fn create_hash_table_iter( + capacity: isize, + entities: *mut Entity, + zero_entity: Option<*mut Entity>, + ) -> Self { + Self::HashMapIterator(HashTableIter::::create( + capacity, + entities, + zero_entity, + )) + } + + pub fn create_two_level_hash_table_iter( + iters: Vec>, + ) -> Self { + Self::TwoLevelHashMapIter(TwoLevelHashTableIter::::create(iters)) + } +} + +impl> Iterator for HashTableIteratorKind { + type Item = *mut Entity; + fn next(&mut self) -> Option { + match self { + HashTableIteratorKind::HashMapIterator(it) => it.next(), + HashTableIteratorKind::TwoLevelHashMapIter(it) => it.next(), + } + } +} + pub struct HashTableIter> { idx: isize, capacity: isize, @@ -65,3 +100,31 @@ impl> Iterator for HashTableIter } } } + +pub struct TwoLevelHashTableIter> { + iters: Vec>, + index: usize, +} + +impl> TwoLevelHashTableIter { + pub fn create(iters: Vec>) -> Self { + Self { iters, index: 0 } + } +} + +impl> Iterator for TwoLevelHashTableIter { + type Item = *mut Entity; + fn next(&mut self) -> Option { + match self.iters[self.index].next() { + Some(x) => Some(x), + None => { + if self.index < self.iters.len() - 1 { + self.index += 1; + self.next() + } else { + None + } + } + } + } +} diff --git a/query/src/common/hashtable/mod.rs b/query/src/common/hashtable/mod.rs index 80d68e7430432..b5fb6a7b96cdd 100644 --- a/query/src/common/hashtable/mod.rs +++ b/query/src/common/hashtable/mod.rs @@ -15,9 +15,15 @@ pub use hash_table::HashTable; pub use hash_table_entity::HashTableEntity; pub use hash_table_entity::KeyValueEntity; -pub use hash_table_grower::Grower; +pub use hash_table_grower::HashTableGrower; +pub use hash_table_grower::SingleLevelGrower; +pub use hash_table_grower::TwoLevelGrower; pub use hash_table_iter::HashTableIter; +pub use hash_table_iter::HashTableIteratorKind; +pub use hash_table_iter::TwoLevelHashTableIter; pub use hash_table_key::HashTableKeyable; +pub use two_level_hash_table::HashTableKind; +pub use two_level_hash_table::TwoLevelHashTable; mod hash_table; #[allow(clippy::missing_safety_doc, clippy::not_unsafe_ptr_arg_deref)] @@ -25,6 +31,11 @@ mod hash_table_entity; mod hash_table_grower; mod hash_table_iter; mod hash_table_key; +mod two_level_hash_table; -pub type HashMap = HashTable>; -pub type HashMapIterator = HashTableIter>; +pub type HashMap = HashTable, SingleLevelGrower>; +pub type TwoLevelHashMap = + TwoLevelHashTable, TwoLevelGrower>; +pub type HashMapIteratorKind = HashTableIteratorKind>; +pub type HashMapKind = + HashTableKind, SingleLevelGrower, TwoLevelGrower>; diff --git a/query/src/common/hashtable/two_level_hash_table.rs b/query/src/common/hashtable/two_level_hash_table.rs new file mode 100644 index 0000000000000..f376674c84b86 --- /dev/null +++ b/query/src/common/hashtable/two_level_hash_table.rs @@ -0,0 +1,174 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Reference the ClickHouse HashTable to implement the Databend HashTable + +use crate::common::HashTable; +use crate::common::HashTableEntity; +use crate::common::HashTableGrower; +use crate::common::HashTableIteratorKind; +use crate::common::HashTableKeyable; + +static BITS_FOR_BUCKET: u8 = 8; +static NUM_BUCKETS: usize = 1 << BITS_FOR_BUCKET; +static MAX_BUCKECT: usize = NUM_BUCKETS - 1; + +pub enum HashTableKind< + Key: HashTableKeyable, + Entity: HashTableEntity, + SingleLevelGrower: HashTableGrower, + TwoLevelGrower: HashTableGrower, +> { + HashTable(HashTable), + TwoLevelHashTable(TwoLevelHashTable), +} + +impl< + Key: HashTableKeyable, + Entity: HashTableEntity, + SingleLevelGrower: HashTableGrower, + TwoLevelGrower: HashTableGrower, + > HashTableKind +{ + pub fn create_hash_table() -> Self { + Self::HashTable(HashTable::create()) + } + + pub fn create_two_level_hash_table() -> Self { + Self::TwoLevelHashTable(TwoLevelHashTable::create()) + } + + #[inline(always)] + pub fn len(&self) -> usize { + match self { + HashTableKind::HashTable(data) => data.len(), + HashTableKind::TwoLevelHashTable(data) => data.len(), + } + } + + #[inline(always)] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[inline(always)] + pub fn iter(&self) -> HashTableIteratorKind { + match self { + HashTableKind::HashTable(data) => data.iter(), + HashTableKind::TwoLevelHashTable(data) => data.iter(), + } + } + + #[inline(always)] + pub fn insert_key(&mut self, key: &Key, inserted: &mut bool) -> *mut Entity { + match self { + HashTableKind::HashTable(data) => data.insert_key(key, inserted), + HashTableKind::TwoLevelHashTable(data) => data.insert_key(key, inserted), + } + } + + #[inline(always)] + pub fn insert_hash_key(&mut self, key: &Key, hash: u64, inserted: &mut bool) -> *mut Entity { + match self { + HashTableKind::HashTable(data) => data.insert_hash_key(key, hash, inserted), + HashTableKind::TwoLevelHashTable(data) => data.insert_hash_key(key, hash, inserted), + } + } + + #[allow(clippy::missing_safety_doc)] + pub unsafe fn convert_to_two_level(&mut self) { + let mut two_level_hash_table = Self::create_two_level_hash_table(); + + if !self.is_empty() { + let mut inserted = true; + for old_entity in self.iter() { + let new_entity = + two_level_hash_table.insert_key(old_entity.get_key(), &mut inserted); + if inserted { + new_entity.swap(old_entity); + } + } + } + std::mem::swap(self, &mut two_level_hash_table); + } +} + +pub struct TwoLevelHashTable< + Key: HashTableKeyable, + Entity: HashTableEntity, + Grower: HashTableGrower, +> { + hash_tables: Vec>, +} + +impl, Grower: HashTableGrower> + TwoLevelHashTable +{ + pub fn create() -> TwoLevelHashTable { + let mut hash_tables: Vec> = Vec::with_capacity(NUM_BUCKETS); + + for _ in 0..NUM_BUCKETS { + hash_tables.push(HashTable::::create()); + } + + TwoLevelHashTable { hash_tables } + } + + #[inline(always)] + pub fn len(&self) -> usize { + self.hash_tables + .iter() + .map(|hash_table| hash_table.len()) + .sum() + } + + #[inline(always)] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[inline(always)] + pub fn iter(&self) -> HashTableIteratorKind { + let mut iters = Vec::with_capacity(NUM_BUCKETS); + for i in 0..NUM_BUCKETS { + iters.push(self.hash_tables[i].iter()) + } + HashTableIteratorKind::::create_two_level_hash_table_iter(iters) + } + + #[inline(always)] + pub fn insert_key(&mut self, key: &Key, inserted: &mut bool) -> *mut Entity { + let hash = key.fast_hash(); + let bucket = self.get_bucket_from_hash(&hash); + self.hash_tables[bucket].insert_hash_key(key, hash, inserted) + } + + #[inline(always)] + pub fn insert_hash_key(&mut self, key: &Key, hash: u64, inserted: &mut bool) -> *mut Entity { + let bucket = self.get_bucket_from_hash(&hash); + self.hash_tables[bucket].insert_hash_key(key, hash, inserted) + } + + #[inline(always)] + pub fn find_key(&self, key: &Key) -> Option<*mut Entity> { + let hash = key.fast_hash(); + let bucket = self.get_bucket_from_hash(&hash); + self.hash_tables[bucket].find_key(key) + } + + #[inline(always)] + fn get_bucket_from_hash(&self, hash_value: &u64) -> usize { + ((hash_value >> (64 - BITS_FOR_BUCKET)) & (MAX_BUCKECT as u64)) as usize + } +} diff --git a/query/src/pipelines/new/pipeline_builder.rs b/query/src/pipelines/new/pipeline_builder.rs index 5eb12b938d7ee..7079051ac3d77 100644 --- a/query/src/pipelines/new/pipeline_builder.rs +++ b/query/src/pipelines/new/pipeline_builder.rs @@ -112,6 +112,7 @@ impl PlanVisitor for QueryPipelineBuilder { transform_output_port, &aggregator_params, )?, + self.ctx.clone(), ) }) } @@ -131,6 +132,7 @@ impl PlanVisitor for QueryPipelineBuilder { transform_output_port, &aggregator_params, )?, + self.ctx.clone(), ) }) } diff --git a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs index de4e7171eb3fc..91f5d9d06af6f 100644 --- a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs +++ b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs @@ -39,6 +39,7 @@ use crate::pipelines::transforms::group_by::GroupColumnsBuilder; use crate::pipelines::transforms::group_by::KeysColumnIter; use crate::pipelines::transforms::group_by::PolymorphicKeysHelper; use crate::pipelines::transforms::group_by::StateEntity; +use crate::sessions::QueryContext; pub type KeysU8FinalAggregator = FinalAggregator; pub type KeysU16FinalAggregator = FinalAggregator; @@ -60,15 +61,15 @@ pub struct FinalAggregator< method: Method, state: Method::State, params: Arc, - // used for deserialization only, so we can reuse it during the loop temp_place: StateAddr, + ctx: Arc, } impl + Send> FinalAggregator { - pub fn create(method: Method, params: Arc) -> Self { + pub fn create(method: Method, params: Arc, ctx: Arc) -> Self { let state = method.aggregate_state(); let temp_place = if params.aggregate_functions.is_empty() { 0.into() @@ -83,6 +84,7 @@ impl + S method, params, temp_place, + ctx, } } } @@ -128,6 +130,12 @@ impl + Send> Aggregator let keys_column = block.column(aggregate_function_len); let keys_iter = self.method.keys_iter_from_column(keys_column)?; + let group_by_two_level_threshold = + self.ctx.get_settings().get_group_by_two_level_threshold()? as usize; + if !self.state.is_two_level() && self.state.len() >= group_by_two_level_threshold { + self.state.convert_to_two_level(); + } + // first state places of current block let places = Self::lookup_state(&self.params, &mut self.state, keys_iter.get_slice()); @@ -204,6 +212,7 @@ impl + Send> Aggregator } columns.extend_from_slice(&group_columns_builder.finish()?); + Ok(Some(DataBlock::create(self.params.schema.clone(), columns))) } } @@ -219,6 +228,12 @@ impl + Send> Aggregator let key_array = block.column(0); let keys_iter = self.method.keys_iter_from_column(key_array)?; + let group_by_two_level_threshold = + self.ctx.get_settings().get_group_by_two_level_threshold()? as usize; + if !self.state.is_two_level() && self.state.len() >= group_by_two_level_threshold { + self.state.convert_to_two_level(); + } + let mut inserted = true; for keys_ref in keys_iter.get_slice() { self.state.entity_by_key(keys_ref, &mut inserted); diff --git a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs index 158ce8e0586c7..2e1d7e35b7aa1 100644 --- a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs +++ b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs @@ -37,6 +37,7 @@ use crate::pipelines::transforms::group_by::AggregatorState; use crate::pipelines::transforms::group_by::KeysColumnBuilder; use crate::pipelines::transforms::group_by::PolymorphicKeysHelper; use crate::pipelines::transforms::group_by::StateEntity; +use crate::sessions::QueryContext; pub type KeysU8PartialAggregator = PartialAggregator; @@ -61,12 +62,13 @@ pub struct PartialAggregator< method: Method, state: Method::State, params: Arc, + ctx: Arc, } impl + Send> PartialAggregator { - pub fn create(method: Method, params: Arc) -> Self { + pub fn create(method: Method, params: Arc, ctx: Arc) -> Self { let state = method.aggregate_state(); Self { is_generated: false, @@ -74,6 +76,7 @@ impl + S state, method, params, + ctx, } } @@ -224,6 +227,12 @@ impl + Send> Aggregator let group_columns = Self::group_columns(&self.params.group_columns_name, &block)?; let group_keys = self.method.build_keys(&group_columns, block.num_rows())?; + let group_by_two_level_threshold = + self.ctx.get_settings().get_group_by_two_level_threshold()? as usize; + if !self.state.is_two_level() && self.state.len() >= group_by_two_level_threshold { + self.state.convert_to_two_level(); + } + let places = Self::lookup_state(&self.params, group_keys, &mut self.state); Self::execute(&self.params, &block, &places) } @@ -242,6 +251,13 @@ impl + Send> Aggregator // 1.1 and 1.2. let group_columns = Self::group_columns(&self.params.group_columns_name, &block)?; let group_keys = self.method.build_keys(&group_columns, block.num_rows())?; + + let group_by_two_level_threshold = + self.ctx.get_settings().get_group_by_two_level_threshold()? as usize; + if !self.state.is_two_level() && self.state.len() >= group_by_two_level_threshold { + self.state.convert_to_two_level(); + } + Self::lookup_key(group_keys, &mut self.state); Ok(()) } diff --git a/query/src/pipelines/new/processors/transforms/transform_aggregator.rs b/query/src/pipelines/new/processors/transforms/transform_aggregator.rs index 802ebee2f0c6d..b13539c11612f 100644 --- a/query/src/pipelines/new/processors/transforms/transform_aggregator.rs +++ b/query/src/pipelines/new/processors/transforms/transform_aggregator.rs @@ -26,6 +26,7 @@ use crate::pipelines::new::processors::processor::ProcessorPtr; use crate::pipelines::new::processors::transforms::aggregator::*; use crate::pipelines::new::processors::AggregatorTransformParams; use crate::pipelines::new::processors::Processor; +use crate::sessions::QueryContext; pub struct TransformAggregator; @@ -34,6 +35,7 @@ impl TransformAggregator { input_port: Arc, output_port: Arc, transform_params: AggregatorTransformParams, + ctx: Arc, ) -> Result { let aggregator_params = transform_params.aggregator_params; @@ -50,64 +52,64 @@ impl TransformAggregator { HashMethodKind::KeysU8(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU8FinalAggregator::::create(method, aggregator_params), + KeysU8FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU16(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU16FinalAggregator::::create(method, aggregator_params), + KeysU16FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU32(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU32FinalAggregator::::create(method, aggregator_params), + KeysU32FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU64(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU64FinalAggregator::::create(method, aggregator_params), + KeysU64FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::SingleString(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SingleStringFinalAggregator::::create(method, aggregator_params), + SingleStringFinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::Serializer(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SerializerFinalAggregator::::create(method, aggregator_params), + SerializerFinalAggregator::::create(method, aggregator_params, ctx), ), }, false => match transform_params.method { HashMethodKind::KeysU8(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU8FinalAggregator::::create(method, aggregator_params), + KeysU8FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU16(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU16FinalAggregator::::create(method, aggregator_params), + KeysU16FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU32(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU32FinalAggregator::::create(method, aggregator_params), + KeysU32FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU64(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU64FinalAggregator::::create(method, aggregator_params), + KeysU64FinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::SingleString(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SingleStringFinalAggregator::::create(method, aggregator_params), + SingleStringFinalAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::Serializer(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SerializerFinalAggregator::::create(method, aggregator_params), + SerializerFinalAggregator::::create(method, aggregator_params, ctx), ), }, } @@ -117,6 +119,7 @@ impl TransformAggregator { input_port: Arc, output_port: Arc, transform_params: AggregatorTransformParams, + ctx: Arc, ) -> Result { let aggregator_params = transform_params.aggregator_params; @@ -133,64 +136,64 @@ impl TransformAggregator { HashMethodKind::KeysU8(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU8PartialAggregator::::create(method, aggregator_params), + KeysU8PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU16(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU16PartialAggregator::::create(method, aggregator_params), + KeysU16PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU32(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU32PartialAggregator::::create(method, aggregator_params), + KeysU32PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU64(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU64PartialAggregator::::create(method, aggregator_params), + KeysU64PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::SingleString(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SingleStringPartialAggregator::::create(method, aggregator_params), + SingleStringPartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::Serializer(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SerializerPartialAggregator::::create(method, aggregator_params), + SerializerPartialAggregator::::create(method, aggregator_params, ctx), ), }, false => match transform_params.method { HashMethodKind::KeysU8(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU8PartialAggregator::::create(method, aggregator_params), + KeysU8PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU16(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU16PartialAggregator::::create(method, aggregator_params), + KeysU16PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU32(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU32PartialAggregator::::create(method, aggregator_params), + KeysU32PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::KeysU64(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - KeysU64PartialAggregator::::create(method, aggregator_params), + KeysU64PartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::SingleString(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SingleStringPartialAggregator::::create(method, aggregator_params), + SingleStringPartialAggregator::::create(method, aggregator_params, ctx), ), HashMethodKind::Serializer(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, - SerializerPartialAggregator::::create(method, aggregator_params), + SerializerPartialAggregator::::create(method, aggregator_params, ctx), ), }, } diff --git a/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs b/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs index 493fad7f6b283..aa62aec57fc94 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs @@ -24,7 +24,7 @@ use common_datavalues::prelude::*; use common_exception::Result; use super::aggregator_groups_builder::SingleStringGroupColumnsBuilder; -use crate::common::HashTable; +use crate::common::HashMapKind; use crate::pipelines::new::processors::AggregatorParams; use crate::pipelines::transforms::group_by::aggregator_groups_builder::FixedKeysGroupColumnsBuilder; use crate::pipelines::transforms::group_by::aggregator_groups_builder::GroupColumnsBuilder; @@ -155,7 +155,8 @@ impl PolymorphicKeysHelper for HashMethodKeysU32 { fn aggregate_state(&self) -> Self::State { LongerFixedKeysAggregatorState:: { area: Bump::new(), - data: HashTable::create(), + data: HashMapKind::create_hash_table(), + two_level_flag: false, } } @@ -186,7 +187,8 @@ impl PolymorphicKeysHelper for HashMethodKeysU64 { fn aggregate_state(&self) -> Self::State { LongerFixedKeysAggregatorState:: { area: Bump::new(), - data: HashTable::create(), + data: HashMapKind::create_hash_table(), + two_level_flag: false, } } @@ -218,7 +220,8 @@ impl PolymorphicKeysHelper for HashMethodSingleString { SerializedKeysAggregatorState { keys_area: Bump::new(), state_area: Bump::new(), - data_state_map: HashTable::create(), + data_state_map: HashMapKind::create_hash_table(), + two_level_flag: false, } } @@ -250,7 +253,8 @@ impl PolymorphicKeysHelper for HashMethodSerializer { SerializedKeysAggregatorState { keys_area: Bump::new(), state_area: Bump::new(), - data_state_map: HashTable::create(), + data_state_map: HashMapKind::create_hash_table(), + two_level_flag: false, } } diff --git a/query/src/pipelines/transforms/group_by/aggregator_state.rs b/query/src/pipelines/transforms/group_by/aggregator_state.rs index 4e3f41c980a4f..7306007555baa 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_state.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_state.rs @@ -23,8 +23,8 @@ use common_datablocks::HashMethodSingleString; use common_datavalues::prelude::*; use common_functions::aggregates::StateAddr; -use crate::common::HashMap; -use crate::common::HashMapIterator; +use crate::common::HashMapIteratorKind; +use crate::common::HashMapKind; use crate::common::HashTableEntity; use crate::common::HashTableKeyable; use crate::common::KeyValueEntity; @@ -79,6 +79,12 @@ pub trait AggregatorState: Sync + Send { fn entity(&mut self, key: &Method::HashKey<'_>, inserted: &mut bool) -> *mut Self::Entity; fn entity_by_key(&mut self, key: &Self::Key, inserted: &mut bool) -> *mut Self::Entity; + + fn is_two_level(&self) -> bool { + false + } + + fn convert_to_two_level(&mut self) {} } /// The fixed length array is used as the data structure to locate the key by subscript @@ -87,6 +93,7 @@ pub struct ShortFixedKeysAggregatorState { size: usize, max_size: usize, data: *mut ShortFixedKeysStateEntity, + two_level_flag: bool, } // TODO:(Winter) Hack: @@ -113,6 +120,7 @@ impl ShortFixedKeysAggregatorState { data: raw_ptr as *mut ShortFixedKeysStateEntity, size: 0, max_size, + two_level_flag: false, } } } @@ -177,11 +185,22 @@ where fn entity_by_key(&mut self, key: &Self::Key, inserted: &mut bool) -> *mut Self::Entity { self.entity(key, inserted) } + + #[inline(always)] + fn is_two_level(&self) -> bool { + self.two_level_flag + } + + #[inline(always)] + fn convert_to_two_level(&mut self) { + self.two_level_flag = true; + } } pub struct LongerFixedKeysAggregatorState { pub area: Bump, - pub data: HashMap, + pub data: HashMapKind, + pub two_level_flag: bool, } // TODO:(Winter) Hack: @@ -203,7 +222,7 @@ where { type Key = T; type Entity = KeyValueEntity; - type Iterator = HashMapIterator; + type Iterator = HashMapIteratorKind; #[inline(always)] fn len(&self) -> usize { @@ -229,12 +248,26 @@ where fn entity_by_key(&mut self, key: &Self::Key, inserted: &mut bool) -> *mut Self::Entity { self.entity(key, inserted) } + + #[inline(always)] + fn is_two_level(&self) -> bool { + self.two_level_flag + } + + #[inline(always)] + fn convert_to_two_level(&mut self) { + unsafe { + self.data.convert_to_two_level(); + } + self.two_level_flag = true; + } } pub struct SerializedKeysAggregatorState { pub keys_area: Bump, pub state_area: Bump, - pub data_state_map: HashMap, + pub data_state_map: HashMapKind, + pub two_level_flag: bool, } // TODO:(Winter) Hack: @@ -251,12 +284,11 @@ unsafe impl Sync for SerializedKeysAggregatorState {} impl AggregatorState for SerializedKeysAggregatorState { type Key = KeysRef; type Entity = KeyValueEntity; - type Iterator = HashMapIterator; + type Iterator = HashMapIteratorKind; fn len(&self) -> usize { self.data_state_map.len() } - fn iter(&self) -> Self::Iterator { self.data_state_map.iter() } @@ -304,12 +336,25 @@ impl AggregatorState for SerializedKeysAggregatorState { state_entity } + + #[inline(always)] + fn is_two_level(&self) -> bool { + self.two_level_flag + } + + #[inline(always)] + fn convert_to_two_level(&mut self) { + unsafe { + self.data_state_map.convert_to_two_level(); + } + self.two_level_flag = true; + } } impl AggregatorState for SerializedKeysAggregatorState { type Key = KeysRef; type Entity = KeyValueEntity; - type Iterator = HashMapIterator; + type Iterator = HashMapIteratorKind; fn len(&self) -> usize { self.data_state_map.len() @@ -363,4 +408,17 @@ impl AggregatorState for SerializedKeysAggregatorState { state_entity } + + #[inline(always)] + fn is_two_level(&self) -> bool { + self.two_level_flag + } + + #[inline(always)] + fn convert_to_two_level(&mut self) { + unsafe { + self.data_state_map.convert_to_two_level(); + } + self.two_level_flag = true; + } } diff --git a/query/src/sessions/session_settings.rs b/query/src/sessions/session_settings.rs index 39c9ad7e16f0b..28782a283410a 100644 --- a/query/src/sessions/session_settings.rs +++ b/query/src/sessions/session_settings.rs @@ -148,6 +148,12 @@ impl Settings { level: ScopeLevel::Session, desc: "Timezone, default value: UTC,", }, + SettingValue { + default_value: DataValue::UInt64(10000), + user_setting: UserSetting::create("group_by_two_level_threshold", DataValue::UInt64(10000)), + level: ScopeLevel::Session, + desc: "The threshold of keys to open two-level aggregation, default value: 10000", + } ]; let settings = Arc::new(RwLock::new(HashMap::default())); @@ -249,6 +255,18 @@ impl Settings { .and_then(|v| v.user_setting.value.as_string()) } + // Get group by two level threshold + pub fn get_group_by_two_level_threshold(&self) -> Result { + let key = "group_by_two_level_threshold"; + self.try_get_u64(key) + } + + // Set group by two level threshold + pub fn set_group_by_two_level_threshold(&self, val: u64) -> Result<()> { + let key = "group_by_two_level_threshold"; + self.try_set_u64(key, val, false) + } + pub fn has_setting(&self, key: &str) -> bool { let settings = self.settings.read(); settings.get(key).is_some() diff --git a/query/src/sql/exec/mod.rs b/query/src/sql/exec/mod.rs index 632bdf44e3d84..9ccb033a4db67 100644 --- a/query/src/sql/exec/mod.rs +++ b/query/src/sql/exec/mod.rs @@ -279,6 +279,7 @@ impl PipelineBuilder { transform_output_port, &partial_aggr_params, )?, + self.ctx.clone(), ) })?; @@ -299,6 +300,7 @@ impl PipelineBuilder { transform_output_port, &final_aggr_params, )?, + self.ctx.clone(), ) })?; diff --git a/query/tests/it/common/hashtable.rs b/query/tests/it/common/hashtable.rs index ba90ceeca929c..2a5faa09c865a 100644 --- a/query/tests/it/common/hashtable.rs +++ b/query/tests/it/common/hashtable.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_query::common::Grower; +use databend_query::common::HashMapKind; +use databend_query::common::HashTableGrower; +use databend_query::common::SingleLevelGrower; #[test] fn test_hash_table_grower() { - let mut grower = Grower::default(); + let mut grower = SingleLevelGrower::default(); assert_eq!(grower.max_size(), 256); @@ -36,3 +38,28 @@ fn test_hash_table_grower() { grower.increase_size(); assert_eq!(grower.max_size(), 1024); } + +#[test] +fn test_two_level_hash_table() { + let mut hashtable = HashMapKind::::create_hash_table(); + let mut inserted = true; + let entity = hashtable.insert_key(&1u64, &mut inserted); + if inserted { + entity.set_value(2); + } + + unsafe { + hashtable.convert_to_two_level(); + } + + let is_two_level = match hashtable { + HashMapKind::HashTable(_) => false, + HashMapKind::TwoLevelHashTable(_) => true, + }; + assert!(is_two_level); + + let entity = hashtable.insert_key(&1u64, &mut inserted); + assert!(!inserted); + + assert_eq!(entity.get_value(), &2); +} diff --git a/query/tests/it/storages/system/settings_table.rs b/query/tests/it/storages/system/settings_table.rs index 701ca82e155a9..49ab0ddbcd36a 100644 --- a/query/tests/it/storages/system/settings_table.rs +++ b/query/tests/it/storages/system/settings_table.rs @@ -39,6 +39,7 @@ async fn test_settings_table() -> Result<()> { "| enable_planner_v2 | 0 | 0 | SESSION | Enable planner v2 by setting this variable to 1, default value: 0 | UInt64 |", "| field_delimiter | , | , | SESSION | Format field delimiter, default value: , | String |", "| flight_client_timeout | 60 | 60 | SESSION | Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds | UInt64 |", + "| group_by_two_level_threshold | 10000 | 10000 | SESSION | The threshold of keys to open two-level aggregation, default value: 10000 | UInt64 |", "| max_block_size | 10000 | 10000 | SESSION | Maximum block size for reading | UInt64 |", "| max_threads | 2 | 16 | SESSION | The maximum number of threads to execute the request. By default, it is determined automatically. | UInt64 |", "| record_delimiter | | | SESSION | Format record_delimiter, default value: | String |", diff --git a/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result b/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result index 15b86140c6f8f..d0ee43608d492 100644 --- a/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result +++ b/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result @@ -38,3 +38,18 @@ NULL 2 3 2022-04-02 5 2022-04-01 00:00:00.000000 5 2022-04-01 00:00:00.000001 5 +0 1 +1 1 +2 1 +3 1 +4 1 +0 1 +1 1 +2 1 +3 1 +4 1 +0 1 +1 1 +2 1 +3 1 +4 1 diff --git a/tests/suites/0_stateless/03_dml/03_0003_select_group_by.sql b/tests/suites/0_stateless/03_dml/03_0003_select_group_by.sql index d30902f63082a..cd7d05d9ff8d4 100644 --- a/tests/suites/0_stateless/03_dml/03_0003_select_group_by.sql +++ b/tests/suites/0_stateless/03_dml/03_0003_select_group_by.sql @@ -41,3 +41,11 @@ select created_at, sum(count) from t_datetime group by created_at order by creat select created_time, sum(count) from t_datetime group by created_time order by created_time; drop table t_datetime; + +-- +SELECT number, count(*) FROM numbers_mt(10000000) group by number order by number limit 5; +set group_by_two_level_threshold=10; +SELECT number, count(*) FROM numbers_mt(10000000) group by number order by number limit 5; +set group_by_two_level_threshold=1000000000; +SELECT number, count(*) FROM numbers_mt(10000000) group by number order by number limit 5; +set group_by_two_level_threshold=10000; diff --git a/tests/suites/0_stateless/06_show/06_0003_show_settings.result b/tests/suites/0_stateless/06_show/06_0003_show_settings.result index 14fad02ba1da1..172071dcb2989 100644 --- a/tests/suites/0_stateless/06_show/06_0003_show_settings.result +++ b/tests/suites/0_stateless/06_show/06_0003_show_settings.result @@ -3,6 +3,7 @@ enable_new_processor_framework 1 1 SESSION Enable new processor framework if val enable_planner_v2 0 0 SESSION Enable planner v2 by setting this variable to 1, default value: 0 UInt64 field_delimiter , , SESSION Format field delimiter, default value: , String flight_client_timeout 60 60 SESSION Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds UInt64 +group_by_two_level_threshold 10000 10000 SESSION The threshold of keys to open two-level aggregation, default value: 10000 UInt64 max_block_size 10000 10000 SESSION Maximum block size for reading UInt64 max_threads 11 16 SESSION The maximum number of threads to execute the request. By default, it is determined automatically. UInt64 record_delimiter \n \n SESSION Format record_delimiter, default value: \n String