diff --git a/Cargo.toml b/Cargo.toml index a67d9d8b4b..c1512334e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,8 +82,8 @@ more-asserts = "0.3.1" rand_distr = "0.4.3" [target.'cfg(not(windows))'.dev-dependencies] -criterion = "0.5" -pprof = { git = "https://github.com/PSeitz/pprof-rs/", rev = "53af24b", features = ["flamegraph", "criterion"] } # temp fork that works with criterion 0.5 +criterion = { version = "0.5" } +pprof = { version= "0.13", features = ["flamegraph", "criterion"] } [dev-dependencies.fail] version = "0.5.0" diff --git a/benches/index-bench.rs b/benches/index-bench.rs index c2a382bb2e..0078595975 100644 --- a/benches/index-bench.rs +++ b/benches/index-bench.rs @@ -1,7 +1,7 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use pprof::criterion::{Output, PProfProfiler}; use tantivy::schema::{TantivyDocument, FAST, INDEXED, STORED, STRING, TEXT}; -use tantivy::{Index, IndexWriter}; +use tantivy::{tokenizer, Index, IndexWriter}; const HDFS_LOGS: &str = include_str!("hdfs.json"); const GH_LOGS: &str = include_str!("gh.json"); @@ -19,6 +19,13 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) { schema_builder.add_text_field("severity", STRING); schema_builder.build() }; + let schema_only_fast = { + let mut schema_builder = tantivy::schema::SchemaBuilder::new(); + schema_builder.add_u64_field("timestamp", FAST); + schema_builder.add_text_field("body", FAST); + schema_builder.add_text_field("severity", FAST); + schema_builder.build() + }; let schema_with_store = { let mut schema_builder = tantivy::schema::SchemaBuilder::new(); schema_builder.add_u64_field("timestamp", INDEXED | STORED); @@ -83,6 +90,30 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) { index_writer.commit().unwrap(); }) }); + group.bench_function("index-hdfs-no-commit-fastfield", |b| { + let lines = get_lines(HDFS_LOGS); + b.iter(|| { + let index = Index::create_in_ram(schema_only_fast.clone()); + let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap(); + for doc_json in &lines { + let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap(); + index_writer.add_document(doc).unwrap(); + } + }) + }); + group.bench_function("index-hdfs-with-commit-fastfield", |b| { + let lines = get_lines(HDFS_LOGS); + b.iter(|| { + let index = Index::create_in_ram(schema_only_fast.clone()); + let mut index_writer: IndexWriter = + index.writer_with_num_threads(1, 100_000_000).unwrap(); + for doc_json in &lines { + let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap(); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + }) + }); group.bench_function("index-hdfs-no-commit-json-without-docstore", |b| { let lines = get_lines(HDFS_LOGS); b.iter(|| { @@ -107,6 +138,18 @@ pub fn gh_index_benchmark(c: &mut Criterion) { schema_builder.add_json_field("json", TEXT | FAST); schema_builder.build() }; + let dynamic_schema_fast = { + let mut schema_builder = tantivy::schema::SchemaBuilder::new(); + schema_builder.add_json_field("json", FAST); + schema_builder.build() + }; + let ff_tokenizer_manager = tokenizer::TokenizerManager::default(); + ff_tokenizer_manager.register( + "raw", + tokenizer::TextAnalyzer::builder(tokenizer::RawTokenizer::default()) + .filter(tokenizer::RemoveLongFilter::limit(255)) + .build(), + ); let mut group = c.benchmark_group("index-gh"); group.throughput(Throughput::Bytes(GH_LOGS.len() as u64)); @@ -115,7 +158,23 @@ pub fn gh_index_benchmark(c: &mut Criterion) { let lines = get_lines(GH_LOGS); b.iter(|| { let json_field = dynamic_schema.get_field("json").unwrap(); - let index = Index::create_in_ram(dynamic_schema.clone()); + let mut index = Index::create_in_ram(dynamic_schema.clone()); + index.set_fast_field_tokenizers(ff_tokenizer_manager.clone()); + let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap(); + for doc_json in &lines { + let json_val: serde_json::Map = + serde_json::from_str(doc_json).unwrap(); + let doc = tantivy::doc!(json_field=>json_val); + index_writer.add_document(doc).unwrap(); + } + }) + }); + group.bench_function("index-gh-fast", |b| { + let lines = get_lines(GH_LOGS); + b.iter(|| { + let json_field = dynamic_schema_fast.get_field("json").unwrap(); + let mut index = Index::create_in_ram(dynamic_schema_fast.clone()); + index.set_fast_field_tokenizers(ff_tokenizer_manager.clone()); let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap(); for doc_json in &lines { let json_val: serde_json::Map = @@ -125,11 +184,13 @@ pub fn gh_index_benchmark(c: &mut Criterion) { } }) }); + group.bench_function("index-gh-with-commit", |b| { let lines = get_lines(GH_LOGS); b.iter(|| { let json_field = dynamic_schema.get_field("json").unwrap(); - let index = Index::create_in_ram(dynamic_schema.clone()); + let mut index = Index::create_in_ram(dynamic_schema.clone()); + index.set_fast_field_tokenizers(ff_tokenizer_manager.clone()); let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap(); for doc_json in &lines { diff --git a/columnar/src/columnar/writer/column_writers.rs b/columnar/src/columnar/writer/column_writers.rs index 82c20db7db..e56cf2e9f2 100644 --- a/columnar/src/columnar/writer/column_writers.rs +++ b/columnar/src/columnar/writer/column_writers.rs @@ -269,7 +269,8 @@ impl StrOrBytesColumnWriter { dictionaries: &mut [DictionaryBuilder], arena: &mut MemoryArena, ) { - let unordered_id = dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes); + let unordered_id = + dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes, arena); self.column_writer.record(doc, unordered_id, arena); } diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 94e7c8473a..53f0088c8a 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -437,6 +437,7 @@ impl ColumnarWriter { &mut symbol_byte_buffer, ), buffers, + &self.arena, &mut column_serializer, )?; column_serializer.finalize()?; @@ -490,6 +491,7 @@ impl ColumnarWriter { // Serialize [Dictionary, Column, dictionary num bytes U32::LE] // Column: [Column Index, Column Values, column index num bytes U32::LE] +#[allow(clippy::too_many_arguments)] fn serialize_bytes_or_str_column( cardinality: Cardinality, num_docs: RowId, @@ -497,6 +499,7 @@ fn serialize_bytes_or_str_column( dictionary_builder: &DictionaryBuilder, operation_it: impl Iterator>, buffers: &mut SpareBuffers, + arena: &MemoryArena, wrt: impl io::Write, ) -> io::Result<()> { let SpareBuffers { @@ -505,7 +508,8 @@ fn serialize_bytes_or_str_column( .. } = buffers; let mut counting_writer = CountingWriter::wrap(wrt); - let term_id_mapping: TermIdMapping = dictionary_builder.serialize(&mut counting_writer)?; + let term_id_mapping: TermIdMapping = + dictionary_builder.serialize(arena, &mut counting_writer)?; let dictionary_num_bytes: u32 = counting_writer.written_bytes() as u32; let mut wrt = counting_writer.finish(); let operation_iterator = operation_it.map(|symbol: ColumnOperation| { diff --git a/columnar/src/dictionary.rs b/columnar/src/dictionary.rs index cdf10f357b..2ccce36f29 100644 --- a/columnar/src/dictionary.rs +++ b/columnar/src/dictionary.rs @@ -1,7 +1,7 @@ use std::io; -use fnv::FnvHashMap; use sstable::SSTable; +use stacker::{MemoryArena, SharedArenaHashMap}; pub(crate) struct TermIdMapping { unordered_to_ord: Vec, @@ -31,29 +31,38 @@ pub struct OrderedId(pub u32); /// mapping. #[derive(Default)] pub(crate) struct DictionaryBuilder { - dict: FnvHashMap, UnorderedId>, - memory_consumption: usize, + dict: SharedArenaHashMap, } impl DictionaryBuilder { /// Get or allocate an unordered id. /// (This ID is simply an auto-incremented id.) - pub fn get_or_allocate_id(&mut self, term: &[u8]) -> UnorderedId { - if let Some(term_id) = self.dict.get(term) { - return *term_id; - } - let new_id = UnorderedId(self.dict.len() as u32); - self.dict.insert(term.to_vec(), new_id); - self.memory_consumption += term.len(); - self.memory_consumption += 40; // Term Metadata + HashMap overhead - new_id + pub fn get_or_allocate_id(&mut self, term: &[u8], arena: &mut MemoryArena) -> UnorderedId { + let next_id = self.dict.len() as u32; + let unordered_id = self + .dict + .mutate_or_create(term, arena, |unordered_id: Option| { + if let Some(unordered_id) = unordered_id { + unordered_id + } else { + next_id + } + }); + UnorderedId(unordered_id) } /// Serialize the dictionary into an fst, and returns the /// `UnorderedId -> TermOrdinal` map. - pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result { - let mut terms: Vec<(&[u8], UnorderedId)> = - self.dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect(); + pub fn serialize<'a, W: io::Write + 'a>( + &self, + arena: &MemoryArena, + wrt: &mut W, + ) -> io::Result { + let mut terms: Vec<(&[u8], UnorderedId)> = self + .dict + .iter(arena) + .map(|(k, v)| (k, arena.read(v))) + .collect(); terms.sort_unstable_by_key(|(key, _)| *key); // TODO Remove the allocation. let mut unordered_to_ord: Vec = vec![OrderedId(0u32); terms.len()]; @@ -68,7 +77,7 @@ impl DictionaryBuilder { } pub(crate) fn mem_usage(&self) -> usize { - self.memory_consumption + self.dict.mem_usage() } } @@ -78,12 +87,13 @@ mod tests { #[test] fn test_dictionary_builder() { + let mut arena = MemoryArena::default(); let mut dictionary_builder = DictionaryBuilder::default(); - let hello_uid = dictionary_builder.get_or_allocate_id(b"hello"); - let happy_uid = dictionary_builder.get_or_allocate_id(b"happy"); - let tax_uid = dictionary_builder.get_or_allocate_id(b"tax"); + let hello_uid = dictionary_builder.get_or_allocate_id(b"hello", &mut arena); + let happy_uid = dictionary_builder.get_or_allocate_id(b"happy", &mut arena); + let tax_uid = dictionary_builder.get_or_allocate_id(b"tax", &mut arena); let mut buffer = Vec::new(); - let id_mapping = dictionary_builder.serialize(&mut buffer).unwrap(); + let id_mapping = dictionary_builder.serialize(&arena, &mut buffer).unwrap(); assert_eq!(id_mapping.to_ord(hello_uid), OrderedId(1)); assert_eq!(id_mapping.to_ord(happy_uid), OrderedId(0)); assert_eq!(id_mapping.to_ord(tax_uid), OrderedId(2)); diff --git a/stacker/src/arena_hashmap.rs b/stacker/src/arena_hashmap.rs index 09999d0210..9f3dfbbc50 100644 --- a/stacker/src/arena_hashmap.rs +++ b/stacker/src/arena_hashmap.rs @@ -1,51 +1,5 @@ -use std::iter::{Cloned, Filter}; -use std::mem; - use super::{Addr, MemoryArena}; -use crate::fastcpy::fast_short_slice_copy; -use crate::memory_arena::store; - -/// Returns the actual memory size in bytes -/// required to create a table with a given capacity. -/// required to create a table of size -pub fn compute_table_memory_size(capacity: usize) -> usize { - capacity * mem::size_of::() -} - -#[cfg(not(feature = "compare_hash_only"))] -type HashType = u32; - -#[cfg(feature = "compare_hash_only")] -type HashType = u64; - -/// `KeyValue` is the item stored in the hash table. -/// The key is actually a `BytesRef` object stored in an external memory arena. -/// The `value_addr` also points to an address in the memory arena. -#[derive(Copy, Clone)] -struct KeyValue { - pub(crate) key_value_addr: Addr, - hash: HashType, -} - -impl Default for KeyValue { - fn default() -> Self { - KeyValue { - key_value_addr: Addr::null_pointer(), - hash: 0, - } - } -} - -impl KeyValue { - #[inline] - fn is_empty(&self) -> bool { - self.key_value_addr.is_null() - } - #[inline] - fn is_not_empty_ref(&self) -> bool { - !self.key_value_addr.is_null() - } -} +use crate::shared_arena_hashmap::SharedArenaHashMap; /// Customized `HashMap` with `&[u8]` keys /// @@ -56,61 +10,13 @@ impl KeyValue { /// The quirky API has the benefit of avoiding /// the computation of the hash of the key twice, /// or copying the key as long as there is no insert. +/// +/// ArenaHashMap is like SharedArenaHashMap but takes ownership +/// of the memory arena. The memory arena stores the serialized +/// keys and values. pub struct ArenaHashMap { - table: Vec, + shared_arena_hashmap: SharedArenaHashMap, pub memory_arena: MemoryArena, - mask: usize, - len: usize, -} - -struct LinearProbing { - pos: usize, - mask: usize, -} - -impl LinearProbing { - #[inline] - fn compute(hash: HashType, mask: usize) -> LinearProbing { - LinearProbing { - pos: hash as usize, - mask, - } - } - - #[inline] - fn next_probe(&mut self) -> usize { - // Not saving the masked version removes a dependency. - self.pos = self.pos.wrapping_add(1); - self.pos & self.mask - } -} - -type IterNonEmpty<'a> = Filter>, fn(&KeyValue) -> bool>; - -pub struct Iter<'a> { - hashmap: &'a ArenaHashMap, - inner: IterNonEmpty<'a>, -} - -impl<'a> Iterator for Iter<'a> { - type Item = (&'a [u8], Addr); - - fn next(&mut self) -> Option { - self.inner.next().map(move |kv| { - let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr); - (key, offset) - }) - } -} - -/// Returns the greatest power of two lower or equal to `n`. -/// Except if n == 0, in that case, return 1. -/// -/// # Panics if n == 0 -fn compute_previous_power_of_two(n: usize) -> usize { - assert!(n > 0); - let msb = (63u32 - (n as u64).leading_zeros()) as u8; - 1 << msb } impl Default for ArenaHashMap { @@ -121,156 +27,44 @@ impl Default for ArenaHashMap { impl ArenaHashMap { pub fn with_capacity(table_size: usize) -> ArenaHashMap { - let table_size_power_of_2 = compute_previous_power_of_two(table_size); let memory_arena = MemoryArena::default(); - let table = vec![KeyValue::default(); table_size_power_of_2]; ArenaHashMap { - table, + shared_arena_hashmap: SharedArenaHashMap::with_capacity(table_size), memory_arena, - mask: table_size_power_of_2 - 1, - len: 0, } } - #[inline] - #[cfg(not(feature = "compare_hash_only"))] - fn get_hash(&self, key: &[u8]) -> HashType { - murmurhash32::murmurhash2(key) - } - - #[inline] - #[cfg(feature = "compare_hash_only")] - fn get_hash(&self, key: &[u8]) -> HashType { - /// Since we compare only the hash we need a high quality hash. - use std::hash::Hasher; - let mut hasher = ahash::AHasher::default(); - hasher.write(key); - hasher.finish() as HashType - } - #[inline] pub fn read(&self, addr: Addr) -> Item { self.memory_arena.read(addr) } - #[inline] - fn probe(&self, hash: HashType) -> LinearProbing { - LinearProbing::compute(hash, self.mask) - } - #[inline] pub fn mem_usage(&self) -> usize { - self.table.len() * mem::size_of::() + self.memory_arena.mem_usage() - } - - #[inline] - fn is_saturated(&self) -> bool { - self.table.len() <= self.len * 2 - } - - #[inline] - fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) { - let data = self.memory_arena.slice_from(addr); - let key_bytes_len_bytes = unsafe { data.get_unchecked(..2) }; - let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap()); - let key_bytes: &[u8] = unsafe { data.get_unchecked(2..2 + key_bytes_len as usize) }; - (key_bytes, addr.offset(2 + key_bytes_len as u32)) - } - - #[inline] - #[cfg(not(feature = "compare_hash_only"))] - fn get_value_addr_if_key_match(&self, target_key: &[u8], addr: Addr) -> Option { - use crate::fastcmp::fast_short_slice_compare; - - let (stored_key, value_addr) = self.get_key_value(addr); - if fast_short_slice_compare(stored_key, target_key) { - Some(value_addr) - } else { - None - } - } - #[inline] - #[cfg(feature = "compare_hash_only")] - fn get_value_addr_if_key_match(&self, _target_key: &[u8], addr: Addr) -> Option { - // For the compare_hash_only feature, it would make sense to store the keys at a different - // memory location. Here they will just pollute the cache. - let data = self.memory_arena.slice_from(addr); - let key_bytes_len_bytes = &data[..2]; - let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap()); - let value_addr = addr.offset(2 + key_bytes_len as u32); - - Some(value_addr) - } - - #[inline] - fn set_bucket(&mut self, hash: HashType, key_value_addr: Addr, bucket: usize) { - self.len += 1; - - self.table[bucket] = KeyValue { - key_value_addr, - hash, - }; + self.shared_arena_hashmap.mem_usage() + self.memory_arena.mem_usage() } #[inline] pub fn is_empty(&self) -> bool { - self.len() == 0 + self.shared_arena_hashmap.is_empty() } #[inline] pub fn len(&self) -> usize { - self.len + self.shared_arena_hashmap.len() } #[inline] - pub fn iter(&self) -> Iter<'_> { - Iter { - inner: self - .table - .iter() - .cloned() - .filter(KeyValue::is_not_empty_ref), - hashmap: self, - } - } - - fn resize(&mut self) { - let new_len = (self.table.len() * 2).max(1 << 13); - let mask = new_len - 1; - self.mask = mask; - let new_table = vec![KeyValue::default(); new_len]; - let old_table = mem::replace(&mut self.table, new_table); - for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) { - let mut probe = LinearProbing::compute(key_value.hash, mask); - loop { - let bucket = probe.next_probe(); - if self.table[bucket].is_empty() { - self.table[bucket] = key_value; - break; - } - } - } + pub fn iter(&self) -> impl Iterator { + self.shared_arena_hashmap.iter(&self.memory_arena) } /// Get a value associated to a key. #[inline] pub fn get(&self, key: &[u8]) -> Option where V: Copy + 'static { - let hash = self.get_hash(key); - let mut probe = self.probe(hash); - loop { - let bucket = probe.next_probe(); - let kv: KeyValue = self.table[bucket]; - if kv.is_empty() { - return None; - } else if kv.hash == hash { - if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) { - let v = self.memory_arena.read(val_addr); - return Some(v); - } - } - } + self.shared_arena_hashmap.get(key, &self.memory_arena) } /// `update` create a new entry for a given key if it does not exist @@ -284,45 +78,10 @@ impl ArenaHashMap { /// If the key already as an associated value, then it will be passed /// `Some(previous_value)`. #[inline] - pub fn mutate_or_create(&mut self, key: &[u8], mut updater: impl FnMut(Option) -> V) + pub fn mutate_or_create(&mut self, key: &[u8], updater: impl FnMut(Option) -> V) where V: Copy + 'static { - if self.is_saturated() { - self.resize(); - } - let hash = self.get_hash(key); - let mut probe = self.probe(hash); - let mut bucket = probe.next_probe(); - let mut kv: KeyValue = self.table[bucket]; - loop { - if kv.is_empty() { - // The key does not exist yet. - let val = updater(None); - let num_bytes = std::mem::size_of::() + key.len() + std::mem::size_of::(); - let key_addr = self.memory_arena.allocate_space(num_bytes); - { - let data = self.memory_arena.slice_mut(key_addr, num_bytes); - let key_len_bytes: [u8; 2] = (key.len() as u16).to_le_bytes(); - data[..2].copy_from_slice(&key_len_bytes); - let stop = 2 + key.len(); - fast_short_slice_copy(key, &mut data[2..stop]); - store(&mut data[stop..], val); - } - - self.set_bucket(hash, key_addr, bucket); - return; - } - if kv.hash == hash { - if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) { - let v = self.memory_arena.read(val_addr); - let new_v = updater(Some(v)); - self.memory_arena.write_at(val_addr, new_v); - return; - } - } - // This allows fetching the next bucket before the loop jmp - bucket = probe.next_probe(); - kv = self.table[bucket]; - } + self.shared_arena_hashmap + .mutate_or_create(key, &mut self.memory_arena, updater); } } @@ -331,7 +90,7 @@ mod tests { use std::collections::HashMap; - use super::{compute_previous_power_of_two, ArenaHashMap}; + use super::ArenaHashMap; #[test] fn test_hash_map() { @@ -362,14 +121,6 @@ mod tests { assert_eq!(hash_map.get::(b"abc"), None); } - #[test] - fn test_compute_previous_power_of_two() { - assert_eq!(compute_previous_power_of_two(8), 8); - assert_eq!(compute_previous_power_of_two(9), 8); - assert_eq!(compute_previous_power_of_two(7), 4); - assert_eq!(compute_previous_power_of_two(u64::MAX as usize), 1 << 63); - } - #[test] fn test_many_terms() { let mut terms: Vec = (0..20_000).map(|val| val.to_string()).collect(); diff --git a/stacker/src/lib.rs b/stacker/src/lib.rs index 04ec3f4146..64945c2485 100644 --- a/stacker/src/lib.rs +++ b/stacker/src/lib.rs @@ -9,10 +9,12 @@ mod expull; mod fastcmp; mod fastcpy; mod memory_arena; +mod shared_arena_hashmap; -pub use self::arena_hashmap::{compute_table_memory_size, ArenaHashMap}; +pub use self::arena_hashmap::ArenaHashMap; pub use self::expull::ExpUnrolledLinkedList; pub use self::memory_arena::{Addr, MemoryArena}; +pub use self::shared_arena_hashmap::{compute_table_memory_size, SharedArenaHashMap}; /// When adding an element in a `ArenaHashMap`, we get a unique id associated to the given key. pub type UnorderedId = u32; diff --git a/stacker/src/shared_arena_hashmap.rs b/stacker/src/shared_arena_hashmap.rs new file mode 100644 index 0000000000..0dbae3dfdf --- /dev/null +++ b/stacker/src/shared_arena_hashmap.rs @@ -0,0 +1,420 @@ +use std::iter::{Cloned, Filter}; +use std::mem; + +use super::{Addr, MemoryArena}; +use crate::fastcpy::fast_short_slice_copy; +use crate::memory_arena::store; + +/// Returns the actual memory size in bytes +/// required to create a table with a given capacity. +/// required to create a table of size +pub fn compute_table_memory_size(capacity: usize) -> usize { + capacity * mem::size_of::() +} + +#[cfg(not(feature = "compare_hash_only"))] +type HashType = u32; + +#[cfg(feature = "compare_hash_only")] +type HashType = u64; + +/// `KeyValue` is the item stored in the hash table. +/// The key is actually a `BytesRef` object stored in an external memory arena. +/// The `value_addr` also points to an address in the memory arena. +#[derive(Copy, Clone)] +struct KeyValue { + key_value_addr: Addr, + hash: HashType, +} + +impl Default for KeyValue { + fn default() -> Self { + KeyValue { + key_value_addr: Addr::null_pointer(), + hash: 0, + } + } +} + +impl KeyValue { + #[inline] + fn is_empty(&self) -> bool { + self.key_value_addr.is_null() + } + #[inline] + fn is_not_empty_ref(&self) -> bool { + !self.key_value_addr.is_null() + } +} + +/// Customized `HashMap` with `&[u8]` keys +/// +/// Its main particularity is that rather than storing its +/// keys in the heap, keys are stored in a memory arena +/// inline with the values. +/// +/// The quirky API has the benefit of avoiding +/// the computation of the hash of the key twice, +/// or copying the key as long as there is no insert. +/// +/// SharedArenaHashMap is like ArenaHashMap but gets the memory arena +/// passed as an argument to the methods. +/// So one MemoryArena can be shared with multiple SharedArenaHashMap. +pub struct SharedArenaHashMap { + table: Vec, + mask: usize, + len: usize, +} + +struct LinearProbing { + pos: usize, + mask: usize, +} + +impl LinearProbing { + #[inline] + fn compute(hash: HashType, mask: usize) -> LinearProbing { + LinearProbing { + pos: hash as usize, + mask, + } + } + + #[inline] + fn next_probe(&mut self) -> usize { + // Not saving the masked version removes a dependency. + self.pos = self.pos.wrapping_add(1); + self.pos & self.mask + } +} + +type IterNonEmpty<'a> = Filter>, fn(&KeyValue) -> bool>; + +pub struct Iter<'a> { + hashmap: &'a SharedArenaHashMap, + memory_arena: &'a MemoryArena, + inner: IterNonEmpty<'a>, +} + +impl<'a> Iterator for Iter<'a> { + type Item = (&'a [u8], Addr); + + fn next(&mut self) -> Option { + self.inner.next().map(move |kv| { + let (key, offset): (&'a [u8], Addr) = self + .hashmap + .get_key_value(kv.key_value_addr, self.memory_arena); + (key, offset) + }) + } +} + +/// Returns the greatest power of two lower or equal to `n`. +/// Except if n == 0, in that case, return 1. +/// +/// # Panics if n == 0 +fn compute_previous_power_of_two(n: usize) -> usize { + assert!(n > 0); + let msb = (63u32 - (n as u64).leading_zeros()) as u8; + 1 << msb +} + +impl Default for SharedArenaHashMap { + fn default() -> Self { + SharedArenaHashMap::with_capacity(4) + } +} + +impl SharedArenaHashMap { + pub fn with_capacity(table_size: usize) -> SharedArenaHashMap { + let table_size_power_of_2 = compute_previous_power_of_two(table_size); + let table = vec![KeyValue::default(); table_size_power_of_2]; + + SharedArenaHashMap { + table, + mask: table_size_power_of_2 - 1, + len: 0, + } + } + + #[inline] + #[cfg(not(feature = "compare_hash_only"))] + fn get_hash(&self, key: &[u8]) -> HashType { + murmurhash32::murmurhash2(key) + } + + #[inline] + #[cfg(feature = "compare_hash_only")] + fn get_hash(&self, key: &[u8]) -> HashType { + /// Since we compare only the hash we need a high quality hash. + use std::hash::Hasher; + let mut hasher = ahash::AHasher::default(); + hasher.write(key); + hasher.finish() as HashType + } + + #[inline] + fn probe(&self, hash: HashType) -> LinearProbing { + LinearProbing::compute(hash, self.mask) + } + + #[inline] + pub fn mem_usage(&self) -> usize { + self.table.len() * mem::size_of::() + } + + #[inline] + fn is_saturated(&self) -> bool { + self.table.len() <= self.len * 2 + } + + #[inline] + fn get_key_value<'a>(&'a self, addr: Addr, memory_arena: &'a MemoryArena) -> (&[u8], Addr) { + let data = memory_arena.slice_from(addr); + let key_bytes_len_bytes = unsafe { data.get_unchecked(..2) }; + let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap()); + let key_bytes: &[u8] = unsafe { data.get_unchecked(2..2 + key_bytes_len as usize) }; + (key_bytes, addr.offset(2 + key_bytes_len as u32)) + } + + #[inline] + #[cfg(not(feature = "compare_hash_only"))] + fn get_value_addr_if_key_match( + &self, + target_key: &[u8], + addr: Addr, + memory_arena: &MemoryArena, + ) -> Option { + use crate::fastcmp::fast_short_slice_compare; + + let (stored_key, value_addr) = self.get_key_value(addr, memory_arena); + if fast_short_slice_compare(stored_key, target_key) { + Some(value_addr) + } else { + None + } + } + #[inline] + #[cfg(feature = "compare_hash_only")] + fn get_value_addr_if_key_match( + &self, + _target_key: &[u8], + addr: Addr, + memory_arena: &MemoryArena, + ) -> Option { + // For the compare_hash_only feature, it would make sense to store the keys at a different + // memory location. Here they will just pollute the cache. + let data = memory_arena.slice_from(addr); + let key_bytes_len_bytes = &data[..2]; + let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap()); + let value_addr = addr.offset(2 + key_bytes_len as u32); + + Some(value_addr) + } + + #[inline] + fn set_bucket(&mut self, hash: HashType, key_value_addr: Addr, bucket: usize) { + self.len += 1; + + self.table[bucket] = KeyValue { + key_value_addr, + hash, + }; + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn iter<'a>(&'a self, memory_arena: &'a MemoryArena) -> Iter<'_> { + Iter { + inner: self + .table + .iter() + .cloned() + .filter(KeyValue::is_not_empty_ref), + hashmap: self, + memory_arena, + } + } + + fn resize(&mut self) { + let new_len = (self.table.len() * 2).max(1 << 3); + let mask = new_len - 1; + self.mask = mask; + let new_table = vec![KeyValue::default(); new_len]; + let old_table = mem::replace(&mut self.table, new_table); + for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) { + let mut probe = LinearProbing::compute(key_value.hash, mask); + loop { + let bucket = probe.next_probe(); + if self.table[bucket].is_empty() { + self.table[bucket] = key_value; + break; + } + } + } + } + + /// Get a value associated to a key. + #[inline] + pub fn get(&self, key: &[u8], memory_arena: &MemoryArena) -> Option + where V: Copy + 'static { + let hash = self.get_hash(key); + let mut probe = self.probe(hash); + loop { + let bucket = probe.next_probe(); + let kv: KeyValue = self.table[bucket]; + if kv.is_empty() { + return None; + } else if kv.hash == hash { + if let Some(val_addr) = + self.get_value_addr_if_key_match(key, kv.key_value_addr, memory_arena) + { + let v = memory_arena.read(val_addr); + return Some(v); + } + } + } + } + + /// `update` create a new entry for a given key if it does not exist + /// or updates the existing entry. + /// + /// The actual logic for this update is define in the `updater` + /// argument. + /// + /// If the key is not present, `updater` will receive `None` and + /// will be in charge of returning a default value. + /// If the key already as an associated value, then it will be passed + /// `Some(previous_value)`. + #[inline] + pub fn mutate_or_create( + &mut self, + key: &[u8], + memory_arena: &mut MemoryArena, + mut updater: impl FnMut(Option) -> V, + ) -> V + where + V: Copy + 'static, + { + if self.is_saturated() { + self.resize(); + } + let hash = self.get_hash(key); + let mut probe = self.probe(hash); + let mut bucket = probe.next_probe(); + let mut kv: KeyValue = self.table[bucket]; + loop { + if kv.is_empty() { + // The key does not exist yet. + let val = updater(None); + let num_bytes = std::mem::size_of::() + key.len() + std::mem::size_of::(); + let key_addr = memory_arena.allocate_space(num_bytes); + { + let data = memory_arena.slice_mut(key_addr, num_bytes); + let key_len_bytes: [u8; 2] = (key.len() as u16).to_le_bytes(); + data[..2].copy_from_slice(&key_len_bytes); + let stop = 2 + key.len(); + fast_short_slice_copy(key, &mut data[2..stop]); + store(&mut data[stop..], val); + } + + self.set_bucket(hash, key_addr, bucket); + return val; + } + if kv.hash == hash { + if let Some(val_addr) = + self.get_value_addr_if_key_match(key, kv.key_value_addr, memory_arena) + { + let v = memory_arena.read(val_addr); + let new_v = updater(Some(v)); + memory_arena.write_at(val_addr, new_v); + return new_v; + } + } + // This allows fetching the next bucket before the loop jmp + bucket = probe.next_probe(); + kv = self.table[bucket]; + } + } +} + +#[cfg(test)] +mod tests { + + use std::collections::HashMap; + + use super::{compute_previous_power_of_two, SharedArenaHashMap}; + use crate::MemoryArena; + + #[test] + fn test_hash_map() { + let mut memory_arena = MemoryArena::default(); + let mut hash_map: SharedArenaHashMap = SharedArenaHashMap::default(); + hash_map.mutate_or_create(b"abc", &mut memory_arena, |opt_val: Option| { + assert_eq!(opt_val, None); + 3u32 + }); + hash_map.mutate_or_create(b"abcd", &mut memory_arena, |opt_val: Option| { + assert_eq!(opt_val, None); + 4u32 + }); + hash_map.mutate_or_create(b"abc", &mut memory_arena, |opt_val: Option| { + assert_eq!(opt_val, Some(3u32)); + 5u32 + }); + let mut vanilla_hash_map = HashMap::new(); + let iter_values = hash_map.iter(&memory_arena); + for (key, addr) in iter_values { + let val: u32 = memory_arena.read(addr); + vanilla_hash_map.insert(key.to_owned(), val); + } + assert_eq!(vanilla_hash_map.len(), 2); + } + #[test] + fn test_empty_hashmap() { + let memory_arena = MemoryArena::default(); + let hash_map: SharedArenaHashMap = SharedArenaHashMap::default(); + assert_eq!(hash_map.get::(b"abc", &memory_arena), None); + } + + #[test] + fn test_compute_previous_power_of_two() { + assert_eq!(compute_previous_power_of_two(8), 8); + assert_eq!(compute_previous_power_of_two(9), 8); + assert_eq!(compute_previous_power_of_two(7), 4); + assert_eq!(compute_previous_power_of_two(u64::MAX as usize), 1 << 63); + } + + #[test] + fn test_many_terms() { + let mut memory_arena = MemoryArena::default(); + let mut terms: Vec = (0..20_000).map(|val| val.to_string()).collect(); + let mut hash_map: SharedArenaHashMap = SharedArenaHashMap::default(); + for term in terms.iter() { + hash_map.mutate_or_create( + term.as_bytes(), + &mut memory_arena, + |_opt_val: Option| 5u32, + ); + } + let mut terms_back: Vec = hash_map + .iter(&memory_arena) + .map(|(bytes, _)| String::from_utf8(bytes.to_vec()).unwrap()) + .collect(); + terms_back.sort(); + terms.sort(); + + for pos in 0..terms.len() { + assert_eq!(terms[pos], terms_back[pos]); + } + } +}