From 59b25e7b47ef7d4ce294a4d2d02f7e6cb3f276a1 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 2 Oct 2024 11:29:41 +0200 Subject: [PATCH] make cache kind configureable --- quickwit/Cargo.lock | 57 ++++++ quickwit/Cargo.toml | 1 + quickwit/quickwit-config/src/lib.rs | 4 +- .../quickwit-config/src/node_config/mod.rs | 14 ++ .../src/node_config/serialize.rs | 4 + quickwit/quickwit-search/src/leaf_cache.rs | 9 +- .../quickwit-search/src/list_fields_cache.rs | 7 +- quickwit/quickwit-search/src/service.rs | 18 +- quickwit/quickwit-storage/Cargo.toml | 1 + .../src/cache/memory_sized_cache.rs | 172 ++++++++---------- quickwit/quickwit-storage/src/cache/mod.rs | 1 - .../src/cache/quickwit_cache.rs | 6 +- .../quickwit-storage/src/cache/stored_item.rs | 52 ------ 13 files changed, 183 insertions(+), 163 deletions(-) delete mode 100644 quickwit/quickwit-storage/src/cache/stored_item.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 3ce5526bd7a..1a2d254fae8 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4090,6 +4090,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "moka" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "mrecordlog" version = "0.4.0" @@ -5580,6 +5600,21 @@ dependencies = [ "zstd 0.11.2+zstd.1.5.2", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "query_map" version = "0.7.0" @@ -6501,6 +6536,7 @@ dependencies = [ "lru", "md5", "mockall", + "moka", "once_cell", "opendal", "pin-project", @@ -6656,6 +6692,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -8173,6 +8218,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tantivy" version = "0.23.0" @@ -8878,6 +8929,12 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index c64877e524f..cb22d7cbd6f 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -152,6 +152,7 @@ matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" +moka = { version = "0.12.8", features = ["sync"] } mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" } new_string_template = "1.5.1" nom = "7.1.3" diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 5e256793fcd..89dd5f7c7bb 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -74,8 +74,8 @@ pub use crate::metastore_config::{ MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig, }; pub use crate::node_config::{ - IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits, - DEFAULT_QW_CONFIG_PATH, + CacheKind, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, + SplitCacheLimits, DEFAULT_QW_CONFIG_PATH, }; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 569dd94c55a..c337e5cdd1b 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -206,14 +206,25 @@ impl SplitCacheLimits { } } +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Default)] +pub enum CacheKind { + // we make this the default to keep old behavior, it's tbd if lfu is a definitive improvement + #[default] + Lru, + Lfu, +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields, default)] pub struct SearcherConfig { pub aggregation_memory_limit: ByteSize, pub aggregation_bucket_limit: u32, pub fast_field_cache_capacity: ByteSize, + pub fast_field_cache_kind: CacheKind, pub split_footer_cache_capacity: ByteSize, + pub split_footer_cache_kind: CacheKind, pub partial_request_cache_capacity: ByteSize, + pub partial_request_cache_kind: CacheKind, pub max_num_concurrent_split_searches: usize, pub max_num_concurrent_split_streams: usize, // Strangely, if None, this will also have the effect of not forwarding @@ -229,8 +240,11 @@ impl Default for SearcherConfig { fn default() -> Self { Self { fast_field_cache_capacity: ByteSize::gb(1), + fast_field_cache_kind: CacheKind::default(), split_footer_cache_capacity: ByteSize::mb(500), + split_footer_cache_kind: CacheKind::default(), partial_request_cache_capacity: ByteSize::mb(64), + partial_request_cache_kind: CacheKind::default(), max_num_concurrent_split_streams: 100, max_num_concurrent_split_searches: 100, aggregation_memory_limit: ByteSize::mb(500), diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 806a7abd520..f1838e31dbe 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -479,6 +479,7 @@ mod tests { use super::*; use crate::storage_config::StorageBackendFlavor; + use crate::CacheKind; fn get_config_filepath(config_filename: &str) -> String { format!( @@ -606,8 +607,11 @@ mod tests { aggregation_memory_limit: ByteSize::gb(1), aggregation_bucket_limit: 500_000, fast_field_cache_capacity: ByteSize::gb(10), + fast_field_cache_kind: CacheKind::Lru, split_footer_cache_capacity: ByteSize::gb(1), + split_footer_cache_kind: CacheKind::Lru, partial_request_cache_capacity: ByteSize::mb(64), + partial_request_cache_kind: CacheKind::Lru, max_num_concurrent_split_searches: 150, max_num_concurrent_split_streams: 120, split_cache: None, diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 491f66f3aee..491d852da2b 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -20,6 +20,7 @@ use std::ops::Bound; use prost::Message; +use quickwit_config::CacheKind; use quickwit_proto::search::{ CountHits, LeafSearchResponse, SearchRequest, SplitIdAndFooterOffsets, }; @@ -47,11 +48,12 @@ pub struct LeafSearchCache { // queries which vary only by search_after. impl LeafSearchCache { - pub fn new(capacity: usize) -> LeafSearchCache { + pub fn new(capacity: usize, cache_kind: CacheKind) -> LeafSearchCache { LeafSearchCache { content: MemorySizedCache::with_capacity_in_bytes( capacity, &quickwit_storage::STORAGE_METRICS.partial_request_cache, + cache_kind, ), } } @@ -191,6 +193,7 @@ impl std::ops::RangeBounds for Range { #[cfg(test)] mod tests { + use quickwit_config::CacheKind; use quickwit_proto::search::{ LeafSearchResponse, PartialHit, SearchRequest, SortValue, SplitIdAndFooterOffsets, }; @@ -199,7 +202,7 @@ mod tests { #[test] fn test_leaf_search_cache_no_timestamp() { - let cache = LeafSearchCache::new(64_000_000); + let cache = LeafSearchCache::new(64_000_000, CacheKind::default()); let split_1 = SplitIdAndFooterOffsets { split_id: "split_1".to_string(), @@ -264,7 +267,7 @@ mod tests { #[test] fn test_leaf_search_cache_timestamp() { - let cache = LeafSearchCache::new(64_000_000); + let cache = LeafSearchCache::new(64_000_000, CacheKind::default()); let split_1 = SplitIdAndFooterOffsets { split_id: "split_1".to_string(), diff --git a/quickwit/quickwit-search/src/list_fields_cache.rs b/quickwit/quickwit-search/src/list_fields_cache.rs index df60258db25..c566aff1250 100644 --- a/quickwit/quickwit-search/src/list_fields_cache.rs +++ b/quickwit/quickwit-search/src/list_fields_cache.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use quickwit_config::CacheKind; use quickwit_proto::search::{ deserialize_split_fields, serialize_split_fields, ListFields, SplitIdAndFooterOffsets, }; @@ -31,11 +32,12 @@ pub struct ListFieldsCache { // TODO For now this simply caches the whole ListFieldsEntryResponse. We could // be more clever and cache aggregates instead. impl ListFieldsCache { - pub fn new(capacity: usize) -> ListFieldsCache { + pub fn new(capacity: usize, cache_kind: CacheKind) -> ListFieldsCache { ListFieldsCache { content: MemorySizedCache::with_capacity_in_bytes( capacity, &quickwit_storage::STORAGE_METRICS.partial_request_cache, + cache_kind, ), } } @@ -71,6 +73,7 @@ impl CacheKey { #[cfg(test)] mod tests { + use quickwit_config::CacheKind; use quickwit_proto::search::{ ListFieldType, ListFields, ListFieldsEntryResponse, SplitIdAndFooterOffsets, }; @@ -79,7 +82,7 @@ mod tests { #[test] fn test_list_fields_cache() { - let cache = ListFieldsCache::new(64_000_000); + let cache = ListFieldsCache::new(64_000_000, CacheKind::default()); let split_1 = SplitIdAndFooterOffsets { split_id: "split_1".to_string(), diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 65516a99e76..1c2f74e0c0e 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -490,6 +490,7 @@ impl SearcherContext { let global_split_footer_cache = MemorySizedCache::with_capacity_in_bytes( capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, + searcher_config.split_footer_cache_kind.clone(), ); let leaf_search_split_semaphore = Arc::new(Semaphore::new( searcher_config.max_num_concurrent_split_searches, @@ -497,11 +498,18 @@ impl SearcherContext { let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; - let storage_long_term_cache = Arc::new(QuickwitCache::new(fast_field_cache_capacity)); - let leaf_search_cache = - LeafSearchCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize); - let list_fields_cache = - ListFieldsCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize); + let storage_long_term_cache = Arc::new(QuickwitCache::new( + fast_field_cache_capacity, + searcher_config.fast_field_cache_kind.clone(), + )); + let leaf_search_cache = LeafSearchCache::new( + searcher_config.partial_request_cache_capacity.as_u64() as usize, + searcher_config.partial_request_cache_kind.clone(), + ); + let list_fields_cache = ListFieldsCache::new( + searcher_config.partial_request_cache_capacity.as_u64() as usize, + searcher_config.partial_request_cache_kind.clone(), + ); let aggregation_limit = AggregationLimitsGuard::new( Some(searcher_config.aggregation_memory_limit.as_u64()), Some(searcher_config.aggregation_bucket_limit), diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index c883c61c265..8fde7902b84 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -22,6 +22,7 @@ hyper = { workspace = true } lru = { workspace = true } md5 = { workspace = true } mockall = { workspace = true, optional = true } +moka = { workspace = true } once_cell = { workspace = true } pin-project = { workspace = true } rand = { workspace = true } diff --git a/quickwit/quickwit-storage/src/cache/memory_sized_cache.rs b/quickwit/quickwit-storage/src/cache/memory_sized_cache.rs index 119e0b4cbec..bc2421acbcd 100644 --- a/quickwit/quickwit-storage/src/cache/memory_sized_cache.rs +++ b/quickwit/quickwit-storage/src/cache/memory_sized_cache.rs @@ -22,33 +22,15 @@ use std::hash::Hash; use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::Mutex; -use std::time::Duration; -use lru::LruCache; -use tokio::time::Instant; -use tracing::{error, warn}; +use moka::sync::Cache; +use quickwit_config::CacheKind; +use tracing::warn; use crate::cache::slice_address::{SliceAddress, SliceAddressKey, SliceAddressRef}; -use crate::cache::stored_item::StoredItem; use crate::metrics::CacheMetrics; use crate::OwnedBytes; -/// We do not evict anything that has been accessed in the last 60s. -/// -/// The goal is to behave better on scan access patterns, without being as aggressive as -/// using a MRU strategy. -/// -/// TLDR is: -/// -/// If two items have been access in the last 60s it is not really worth considering the -/// latter too be more recent than the previous and do an eviction. -/// The difference is not significant enough to raise the probability of its future access. -/// -/// On the other hand, for very large queries involving enough data to saturate the cache, -/// we are facing a scanning pattern. If variations of this query is repeated over and over -/// a regular LRU eviction policy would yield a hit rate of 0. -const MIN_TIME_SINCE_LAST_ACCESS: Duration = Duration::from_secs(60); - #[derive(Clone, Copy, Debug, PartialEq)] enum Capacity { Unlimited, @@ -64,64 +46,71 @@ impl Capacity { } } -struct NeedMutMemorySizedCache { - lru_cache: LruCache, - num_items: usize, - num_bytes: u64, +// TODO this actually doesn't need mut access, we should remove the mutext and related complexity +// eventually +struct NeedMutMemorySizedCache { + cache: Cache, capacity: Capacity, cache_counters: &'static CacheMetrics, } -impl Drop for NeedMutMemorySizedCache { +impl Drop for NeedMutMemorySizedCache { fn drop(&mut self) { + self.cache.run_pending_tasks(); self.cache_counters .in_cache_count - .sub(self.num_items as i64); + .sub(self.cache.entry_count() as i64); self.cache_counters .in_cache_num_bytes - .sub(self.num_bytes as i64); + .sub(self.cache.weighted_size() as i64); } } -impl NeedMutMemorySizedCache { +impl NeedMutMemorySizedCache { /// Creates a new NeedMutSliceCache with the given capacity. - fn with_capacity(capacity: Capacity, cache_counters: &'static CacheMetrics) -> Self { + fn with_capacity( + capacity: Capacity, + cache_counters: &'static CacheMetrics, + cache_kind: CacheKind, + ) -> Self { + let mut cache_builder = Cache::::builder() + .eviction_policy(match cache_kind { + CacheKind::Lfu => moka::policy::EvictionPolicy::tiny_lfu(), + CacheKind::Lru => moka::policy::EvictionPolicy::lru(), + }) + .weigher(|_k, v| v.len().try_into().unwrap_or(u32::MAX)) + .eviction_listener(|_k, v, _cause| { + cache_counters.in_cache_count.dec(); + cache_counters.in_cache_num_bytes.sub(v.len() as i64); + }); + cache_builder = match capacity { + Capacity::InBytes(capacity) if capacity > 0 => { + cache_builder.max_capacity(capacity as u64) + } + _ => cache_builder, + }; NeedMutMemorySizedCache { - // The limit will be decided by the amount of memory in the cache, - // not the number of items in the cache. - // Enforcing this limit is done in the `NeedMutCache` impl. - lru_cache: LruCache::unbounded(), - num_items: 0, - num_bytes: 0, + cache: cache_builder.build(), capacity, cache_counters, } } - pub fn record_item(&mut self, num_bytes: u64) { - self.num_items += 1; - self.num_bytes += num_bytes; + pub fn record_item(&self, num_bytes: u64) { self.cache_counters.in_cache_count.inc(); self.cache_counters.in_cache_num_bytes.add(num_bytes as i64); } - pub fn drop_item(&mut self, num_bytes: u64) { - self.num_items -= 1; - self.num_bytes -= num_bytes; - self.cache_counters.in_cache_count.dec(); - self.cache_counters.in_cache_num_bytes.sub(num_bytes as i64); - } - - pub fn get(&mut self, cache_key: &Q) -> Option + pub fn get(&self, cache_key: &Q) -> Option where K: Borrow, Q: Hash + Eq + ?Sized, { - let item_opt = self.lru_cache.get_mut(cache_key); + let item_opt = self.cache.get(cache_key); if let Some(item) = item_opt { self.cache_counters.hits_num_items.inc(); self.cache_counters.hits_num_bytes.inc_by(item.len() as u64); - Some(item.payload()) + Some(item) } else { self.cache_counters.misses_num_items.inc(); None @@ -131,9 +120,11 @@ impl NeedMutMemorySizedCache { /// Attempt to put the given amount of data in the cache. /// This may fail silently if the owned_bytes slice is larger than the cache /// capacity. - fn put(&mut self, key: K, bytes: OwnedBytes) { + fn put(&self, key: K, bytes: OwnedBytes) { if self.capacity.exceeds_capacity(bytes.len()) { // The value does not fit in the cache. We simply don't store it. + // We do this so we can log, as the underlying cache should already reject this value + // anyway if self.capacity != Capacity::InBytes(0) { warn!( capacity_in_bytes = ?self.capacity, @@ -143,56 +134,29 @@ impl NeedMutMemorySizedCache { } return; } - if let Some(previous_data) = self.lru_cache.pop(&key) { - self.drop_item(previous_data.len() as u64); - } - let now = Instant::now(); - while self - .capacity - .exceeds_capacity(self.num_bytes as usize + bytes.len()) - { - if let Some((_, candidate_for_eviction)) = self.lru_cache.peek_lru() { - let time_since_last_access = - now.duration_since(candidate_for_eviction.last_access_time()); - if time_since_last_access < MIN_TIME_SINCE_LAST_ACCESS { - // It is not worth doing an eviction. - // TODO: It is sub-optimal that we might have needlessly evicted items in this - // loop before just returning. - return; - } - } - if let Some((_, bytes)) = self.lru_cache.pop_lru() { - self.drop_item(bytes.len() as u64); - } else { - error!( - "Logical error. Even after removing all of the items in the cache the \ - capacity is insufficient. This case is guarded against and should never \ - happen." - ); - return; - } - } self.record_item(bytes.len() as u64); - self.lru_cache.put(key, StoredItem::new(bytes, now)); + self.cache.insert(key, bytes); } } /// A simple in-resident memory slice cache. -pub struct MemorySizedCache { +pub struct MemorySizedCache { inner: Mutex>, } -impl MemorySizedCache { +impl MemorySizedCache { /// Creates an slice cache with the given capacity. pub fn with_capacity_in_bytes( capacity_in_bytes: usize, cache_counters: &'static CacheMetrics, + cache_kind: CacheKind, ) -> Self { MemorySizedCache { inner: Mutex::new(NeedMutMemorySizedCache::with_capacity( Capacity::InBytes(capacity_in_bytes), cache_counters, + cache_kind, )), } } @@ -203,6 +167,7 @@ impl MemorySizedCache { inner: Mutex::new(NeedMutMemorySizedCache::with_capacity( Capacity::Unlimited, cache_counters, + CacheKind::Lru, // this doesn't matter on unbounded cache )), } } @@ -222,6 +187,11 @@ impl MemorySizedCache { pub fn put(&self, val: K, bytes: OwnedBytes) { self.inner.lock().unwrap().put(val, bytes); } + + #[cfg(test)] + fn force_sync(&self) { + self.inner.lock().unwrap().cache.run_pending_tasks(); + } } impl MemorySizedCache { @@ -249,7 +219,11 @@ mod tests { #[tokio::test] async fn test_cache_edge_condition() { tokio::time::pause(); - let cache = MemorySizedCache::::with_capacity_in_bytes(5, &CACHE_METRICS_FOR_TESTS); + let cache = MemorySizedCache::::with_capacity_in_bytes( + 5, + &CACHE_METRICS_FOR_TESTS, + CacheKind::Lfu, + ); { let data = OwnedBytes::new(&b"abc"[..]); cache.put("3".to_string(), data); @@ -265,27 +239,27 @@ mod tests { { let data = OwnedBytes::new(&b"fghij"[..]); cache.put("5".to_string(), data); - // Eviction should not happen, because all items in cache are too young. + cache.force_sync(); + // based on previous acces patterns, this key isn't as good as the others, so it's not + // even inserted assert!(cache.get(&"5".to_string()).is_none()); + // our two first entries should have been removed from the cache + assert!(cache.get(&"2".to_string()).is_some()); + assert!(cache.get(&"3".to_string()).is_some()); } - tokio::time::advance(super::MIN_TIME_SINCE_LAST_ACCESS.mul_f32(1.1f32)).await; { + for _ in 0..5 { + assert!(cache.get(&"5".to_string()).is_none()); + } let data = OwnedBytes::new(&b"fghij"[..]); cache.put("5".to_string(), data); - assert_eq!(cache.get(&"5".to_string()).unwrap(), &b"fghij"[..]); - // our two first entries should have be removed from the cache + cache.force_sync(); + // now that it has some popularity, it should be preferred over the other keys + assert!(cache.get(&"5".to_string()).is_some()); + // our two first entries should have been removed from the cache assert!(cache.get(&"2".to_string()).is_none()); assert!(cache.get(&"3".to_string()).is_none()); } - tokio::time::advance(super::MIN_TIME_SINCE_LAST_ACCESS.mul_f32(1.1f32)).await; - { - let data = OwnedBytes::new(&b"klmnop"[..]); - cache.put("6".to_string(), data); - // The entry put should have been dismissed as it is too large for the cache - assert!(cache.get(&"6".to_string()).is_none()); - // The previous entry should however be remaining. - assert_eq!(cache.get(&"5".to_string()).unwrap(), &b"fghij"[..]); - } } #[test] @@ -306,7 +280,11 @@ mod tests { #[test] fn test_cache() { - let cache = MemorySizedCache::with_capacity_in_bytes(10_000, &CACHE_METRICS_FOR_TESTS); + let cache = MemorySizedCache::with_capacity_in_bytes( + 10_000, + &CACHE_METRICS_FOR_TESTS, + CacheKind::default(), + ); assert!(cache.get(&"hello.seg").is_none()); let data = OwnedBytes::new(&b"werwer"[..]); cache.put("hello.seg", data); diff --git a/quickwit/quickwit-storage/src/cache/mod.rs b/quickwit/quickwit-storage/src/cache/mod.rs index d77ecd51a5c..ab9640136ef 100644 --- a/quickwit/quickwit-storage/src/cache/mod.rs +++ b/quickwit/quickwit-storage/src/cache/mod.rs @@ -22,7 +22,6 @@ mod memory_sized_cache; mod quickwit_cache; mod slice_address; mod storage_with_cache; -mod stored_item; use std::ops::Range; use std::path::{Path, PathBuf}; diff --git a/quickwit/quickwit-storage/src/cache/quickwit_cache.rs b/quickwit/quickwit-storage/src/cache/quickwit_cache.rs index 8c7029dc746..cb9d4ea11b6 100644 --- a/quickwit/quickwit-storage/src/cache/quickwit_cache.rs +++ b/quickwit/quickwit-storage/src/cache/quickwit_cache.rs @@ -22,6 +22,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use async_trait::async_trait; +use quickwit_config::CacheKind; use crate::cache::{MemorySizedCache, StorageCache}; use crate::metrics::CacheMetrics; @@ -44,7 +45,7 @@ impl From)>> for QuickwitCache { impl QuickwitCache { /// Creates a [`QuickwitCache`] with a cache on fast fields /// with a capacity of `fast_field_cache_capacity`. - pub fn new(fast_field_cache_capacity: usize) -> Self { + pub fn new(fast_field_cache_capacity: usize, cache_kind: CacheKind) -> Self { let mut quickwit_cache = QuickwitCache::empty(); let fast_field_cache_counters: &'static CacheMetrics = &crate::STORAGE_METRICS.fast_field_cache; @@ -53,6 +54,7 @@ impl QuickwitCache { Arc::new(SimpleCache::with_capacity_in_bytes( fast_field_cache_capacity, fast_field_cache_counters, + cache_kind, )), ); quickwit_cache @@ -125,11 +127,13 @@ impl SimpleCache { fn with_capacity_in_bytes( capacity_in_bytes: usize, cache_counters: &'static CacheMetrics, + cache_kind: CacheKind, ) -> Self { SimpleCache { slice_cache: MemorySizedCache::with_capacity_in_bytes( capacity_in_bytes, cache_counters, + cache_kind, ), } } diff --git a/quickwit/quickwit-storage/src/cache/stored_item.rs b/quickwit/quickwit-storage/src/cache/stored_item.rs deleted file mode 100644 index 480c2cbebbf..00000000000 --- a/quickwit/quickwit-storage/src/cache/stored_item.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use tantivy::directory::OwnedBytes; -use tokio::time::Instant; - -/// It is a bit overkill to put this in its own module, but I -/// wanted to ensure that no one would access payload without updating `last_access_time`. -pub(super) struct StoredItem { - last_access_time: Instant, - payload: OwnedBytes, -} - -impl StoredItem { - pub fn new(payload: OwnedBytes, now: Instant) -> Self { - StoredItem { - last_access_time: now, - payload, - } - } -} - -impl StoredItem { - pub fn payload(&mut self) -> OwnedBytes { - self.last_access_time = Instant::now(); - self.payload.clone() - } - - pub fn len(&self) -> usize { - self.payload.len() - } - - pub fn last_access_time(&self) -> Instant { - self.last_access_time - } -}