Skip to content

Commit

Permalink
optimize AggDynList/Set by storing raw scalar values (#418)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Mar 27, 2024
1 parent 7a3f63c commit 81d9bfc
Show file tree
Hide file tree
Showing 6 changed files with 501 additions and 486 deletions.
55 changes: 34 additions & 21 deletions native-engine/datafusion-ext-commons/src/bytes_arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Default for BytesArena {
}

impl BytesArena {
pub fn add(&mut self, bytes: &[u8]) -> u64 {
pub fn add(&mut self, bytes: &[u8]) -> BytesArenaAddr {
// assume bytes_len < 2^32
let cur_buf_len = self.cur_buf().len();
let len = bytes.len();
Expand All @@ -45,16 +45,16 @@ impl BytesArena {
let id = self.bufs.len() - 1;
let offset = self.cur_buf().len();
self.cur_buf_mut().extend_from_slice(bytes);
make_arena_addr(id, offset, len)
BytesArenaAddr::new(id, offset, len)
}

pub fn get(&self, addr: u64) -> &[u8] {
let (id, offset, len) = unapply_arena_addr(addr);
pub fn get(&self, addr: BytesArenaAddr) -> &[u8] {
let unpacked = addr.unpack();
unsafe {
// safety - performance critical, assume addr is valid
self.bufs
.get_unchecked(id)
.get_unchecked(offset..offset + len)
.get_unchecked(unpacked.id)
.get_unchecked(unpacked.offset..unpacked.offset + unpacked.len)
}
}

Expand All @@ -64,17 +64,17 @@ impl BytesArena {

/// specialized for merging two parts in sort-exec
/// works like an IntoIterator, free memory of visited items
pub fn specialized_get_and_drop_last(&mut self, addr: u64) -> &[u8] {
let (id, offset, len) = unapply_arena_addr(addr);
if id > 0 && !self.bufs[id - 1].is_empty() {
self.bufs[id - 1].truncate(0); // drop last buf
self.bufs[id - 1].shrink_to_fit();
pub fn specialized_get_and_drop_last(&mut self, addr: BytesArenaAddr) -> &[u8] {
let unpacked = addr.unpack();
if unpacked.id > 0 && !self.bufs[unpacked.id - 1].is_empty() {
self.bufs[unpacked.id - 1].truncate(0); // drop last buf
self.bufs[unpacked.id - 1].shrink_to_fit();
}
unsafe {
// safety - performance critical, assume addr is valid
self.bufs
.get_unchecked(id)
.get_unchecked(offset..offset + len)
.get_unchecked(unpacked.id)
.get_unchecked(unpacked.offset..unpacked.offset + unpacked.len)
}
}

Expand All @@ -97,14 +97,27 @@ impl BytesArena {
}
}

fn make_arena_addr(id: usize, offset: usize, len: usize) -> u64 {
(id as u64 * BUF_CAPACITY_TARGET as u64 + offset as u64) << 32 | len as u64
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct BytesArenaAddr(u64);

impl BytesArenaAddr {
pub fn new(id: usize, offset: usize, len: usize) -> Self {
Self((id as u64 * BUF_CAPACITY_TARGET as u64 + offset as u64) << 32 | len as u64)
}

pub fn unpack(self) -> UnpackedBytesArenaAddr {
let id_offset = self.0 >> 32;
let id = (id_offset / (BUF_CAPACITY_TARGET as u64)) as usize;
let offset = (id_offset % (BUF_CAPACITY_TARGET as u64)) as usize;
let len = (self.0 << 32 >> 32) as usize;

UnpackedBytesArenaAddr { id, offset, len }
}
}

fn unapply_arena_addr(addr: u64) -> (usize, usize, usize) {
let id_offset = addr >> 32;
let id = id_offset / (BUF_CAPACITY_TARGET as u64);
let offset = id_offset % (BUF_CAPACITY_TARGET as u64);
let len = addr << 32 >> 32;
(id as usize, offset as usize, len as usize)
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct UnpackedBytesArenaAddr {
pub id: usize,
pub offset: usize,
pub len: usize,
}
Loading

0 comments on commit 81d9bfc

Please sign in to comment.