Skip to content

Commit

Permalink
use custom implemented hashmap for aggregation (#617)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Oct 17, 2024
1 parent e250a64 commit f222016
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 254 deletions.
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 23 additions & 13 deletions native-engine/datafusion-ext-plans/src/agg/acc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ use itertools::Itertools;
use slimmer_box::SlimmerBox;
use smallvec::SmallVec;

use crate::agg::agg_table::agg_hash;

pub type DynVal = Option<Box<dyn AggDynValue>>;

const ACC_STORE_BLOCK_SIZE: usize = 65536;
Expand Down Expand Up @@ -73,6 +71,16 @@ impl AccStore {
self.num_accs * (self.fixed_len() + self.dyns_len() * size_of::<DynVal>() + 32)
}

pub fn num_accs(&self) -> usize {
self.num_accs
}

pub fn extend(&mut self, num_accs: usize) {
while self.num_accs < num_accs {
self.new_acc();
}
}

pub fn new_acc(&mut self) -> u32 {
let initial = unsafe {
// safety: ignore borrow checker
Expand Down Expand Up @@ -513,9 +521,9 @@ pub fn create_dyn_loaders_from_initial_value(values: &[AccumInitialValue]) -> Re
InternalSet::Small(s) => s.push(pos_len),
InternalSet::Huge(s) => {
let raw = list.ref_raw(pos_len);
let hash = agg_hash::<AGG_DYN_SET_HASH_SEED>(raw);
let hash = acc_hash(raw);
s.insert(hash, pos_len, |&pos_len| {
agg_hash::<AGG_DYN_SET_HASH_SEED>(list.ref_raw(pos_len))
acc_hash(list.ref_raw(pos_len))
});
}
}
Expand Down Expand Up @@ -913,17 +921,19 @@ impl InternalSet {

for &mut pos_len in s {
let raw = list.ref_raw(pos_len);
let hash = agg_hash::<AGG_DYN_SET_HASH_SEED>(raw);
huge.insert(hash, pos_len, |&pos_len| {
agg_hash::<AGG_DYN_SET_HASH_SEED>(list.ref_raw(pos_len))
});
let hash = acc_hash(raw);
huge.insert(hash, pos_len, |&pos_len| acc_hash(list.ref_raw(pos_len)));
}
*self = Self::Huge(huge);
}
}
}

const AGG_DYN_SET_HASH_SEED: u32 = 0x7BCB48DA;
#[inline]
pub fn acc_hash(value: impl AsRef<[u8]>) -> u64 {
const ACC_HASH_SEED: u32 = 0x7BCB48DA;
gxhash::gxhash64(value.as_ref(), ACC_HASH_SEED as i64)
}

impl AggDynSet {
pub fn append(&mut self, value: &ScalarValue, nullable: bool) {
Expand Down Expand Up @@ -966,11 +976,11 @@ impl AggDynSet {
}
}
InternalSet::Huge(s) => {
let hash = agg_hash::<AGG_DYN_SET_HASH_SEED>(raw);
let hash = acc_hash(raw);
match s.find_or_find_insert_slot(
hash,
|&pos_len| new_len == pos_len.1 as usize && raw == self.list.ref_raw(pos_len),
|&pos_len| agg_hash::<AGG_DYN_SET_HASH_SEED>(self.list.ref_raw(pos_len)),
|&pos_len| acc_hash(self.list.ref_raw(pos_len)),
) {
Ok(_found) => {}
Err(slot) => {
Expand Down Expand Up @@ -1005,13 +1015,13 @@ impl AggDynSet {
}
InternalSet::Huge(s) => {
let new_value = self.list.ref_raw(new_pos_len);
let hash = agg_hash::<AGG_DYN_SET_HASH_SEED>(new_value);
let hash = acc_hash(new_value);
match s.find_or_find_insert_slot(
hash,
|&pos_len| {
new_len == pos_len.1 as usize && new_value == self.list.ref_raw(pos_len)
},
|&pos_len| agg_hash::<AGG_DYN_SET_HASH_SEED>(self.list.ref_raw(pos_len)),
|&pos_len| acc_hash(self.list.ref_raw(pos_len)),
) {
Ok(_found) => {
inserted = false;
Expand Down
Loading

0 comments on commit f222016

Please sign in to comment.