From fd135361f650182de7034a69387f6642f036c95d Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 4 Dec 2024 16:32:29 +0100 Subject: [PATCH] Limit permit memory size with split size --- .../quickwit-directories/src/hot_directory.rs | 35 +--- quickwit/quickwit-search/src/fetch_docs.rs | 2 +- quickwit/quickwit-search/src/leaf.rs | 194 +++++++++++++++--- quickwit/quickwit-search/src/list_terms.rs | 9 +- .../src/search_permit_provider.rs | 178 +++++++++++----- .../quickwit-search/src/search_stream/leaf.rs | 2 +- 6 files changed, 306 insertions(+), 114 deletions(-) diff --git a/quickwit/quickwit-directories/src/hot_directory.rs b/quickwit/quickwit-directories/src/hot_directory.rs index d217ac29851..abb0f6c30bb 100644 --- a/quickwit/quickwit-directories/src/hot_directory.rs +++ b/quickwit/quickwit-directories/src/hot_directory.rs @@ -204,18 +204,6 @@ impl StaticDirectoryCache { pub fn get_file_length(&self, path: &Path) -> Option { self.file_lengths.get(path).copied() } - - /// return the files and their cached lengths - pub fn get_stats(&self) -> Vec<(PathBuf, usize)> { - let mut entries = self - .slices - .iter() - .map(|(path, cache)| (path.to_owned(), cache.len())) - .collect::>(); - - entries.sort_by_key(|el| el.0.to_owned()); - entries - } } /// A SliceCache is a static toring @@ -265,10 +253,6 @@ impl StaticSliceCache { } None } - - pub fn len(&self) -> usize { - self.bytes.len() - } } struct StaticSliceCacheBuilder { @@ -376,12 +360,14 @@ impl HotDirectory { }), }) } - /// Get files and their cached sizes. - pub fn get_stats_per_file( - hot_cache_bytes: OwnedBytes, - ) -> anyhow::Result> { - let static_cache = StaticDirectoryCache::open(hot_cache_bytes)?; - Ok(static_cache.get_stats()) + /// Get all the directory files and their sizes. + pub fn get_file_sizes(&self) -> Vec<(PathBuf, u64)> { + self.inner + .cache + .file_lengths + .iter() + .map(|(k, v)| (k.clone(), *v)) + .collect() } } @@ -704,11 +690,6 @@ mod tests { assert_eq!(directory_cache.get_file_length(three_path), Some(300)); assert_eq!(directory_cache.get_file_length(four_path), None); - let stats = directory_cache.get_stats(); - assert_eq!(stats[0], (one_path.to_owned(), 8)); - assert_eq!(stats[1], (three_path.to_owned(), 0)); - assert_eq!(stats[2], (two_path.to_owned(), 7)); - assert_eq!( directory_cache .get_slice(one_path) diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index e8cf5c3a1fb..d75f7efff0c 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -174,7 +174,7 @@ async fn fetch_docs_in_split( global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful // when fetching docs as we will fetch them only once. - let mut index = open_index_with_caches( + let (mut index, _) = open_index_with_caches( &searcher_context, index_storage, split, diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index ac3adb98760..71dd52a0acc 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -126,11 +126,29 @@ pub(crate) async fn open_split_bundle( Ok((hotcache_bytes, bundle_storage)) } +/// Add a storage proxy to retry `get_slice` requests if they are taking too long, +/// if configured in the searcher config. +/// +/// The goal here is too ensure a low latency. +fn configure_storage_retries( + searcher_context: &SearcherContext, + index_storage: Arc, +) -> Arc { + if let Some(storage_timeout_policy) = &searcher_context.searcher_config.storage_timeout_policy { + Arc::new(TimeoutAndRetryStorage::new( + index_storage, + storage_timeout_policy.clone(), + )) + } else { + index_storage + } +} + /// Opens a `tantivy::Index` for the given split with several cache layers: /// - A split footer cache given by `SearcherContext.split_footer_cache`. /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the -/// returned `Index`. +/// - An ephemeral unbounded cache directory (whose lifetime is tied to the +/// returned `Index` if no `ByteRangeCache` is provided). #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, @@ -138,22 +156,9 @@ pub(crate) async fn open_index_with_caches( split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, ephemeral_unbounded_cache: Option, -) -> anyhow::Result { - // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, - // if configured in the searcher config. - // - // The goal here is too ensure a low latency. - - let index_storage_with_retry_on_timeout = if let Some(storage_timeout_policy) = - &searcher_context.searcher_config.storage_timeout_policy - { - Arc::new(TimeoutAndRetryStorage::new( - index_storage, - storage_timeout_policy.clone(), - )) - } else { - index_storage - }; +) -> anyhow::Result<(Index, HotDirectory)> { + let index_storage_with_retry_on_timeout = + configure_storage_retries(searcher_context, index_storage); let (hotcache_bytes, bundle_storage) = open_split_bundle( searcher_context, @@ -176,7 +181,7 @@ pub(crate) async fn open_index_with_caches( HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? }; - let mut index = Index::open(hot_directory)?; + let mut index = Index::open(hot_directory.clone())?; if let Some(tokenizer_manager) = tokenizer_manager { index.set_tokenizers(tokenizer_manager.tantivy_manager().clone()); } @@ -185,7 +190,7 @@ pub(crate) async fn open_index_with_caches( .tantivy_manager() .clone(), ); - Ok(index) + Ok((index, hot_directory)) } /// Tantivy search does not make it possible to fetch data asynchronously during @@ -370,6 +375,17 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { } } +/// Compute the size of the index, store excluded. +fn compute_index_size(hot_directory: &HotDirectory) -> ByteSize { + let size_bytes = hot_directory + .get_file_sizes() + .iter() + .filter(|(path, _)| !path.to_string_lossy().ends_with("store")) + .map(|(_, size)| *size) + .sum(); + ByteSize(size_bytes) +} + /// Apply a leaf search on a single split. #[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( @@ -408,7 +424,7 @@ async fn leaf_search_single_split( let split_id = split.split_id.to_string(); let byte_range_cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); - let index = open_index_with_caches( + let (index, hot_directory) = open_index_with_caches( searcher_context, storage, &split, @@ -416,7 +432,11 @@ async fn leaf_search_single_split( Some(byte_range_cache.clone()), ) .await?; - let split_schema = index.schema(); + + let index_size = compute_index_size(&hot_directory); + if index_size < search_permit.memory_allocation() { + search_permit.update_memory_usage(index_size); + } let reader = index .reader_builder() @@ -427,6 +447,7 @@ async fn leaf_search_single_split( let mut collector = make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; + let split_schema = index.schema(); let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; let collector_warmup_info = collector.warmup_info(); @@ -437,9 +458,20 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let warmup_end = Instant::now(); let warmup_duration: Duration = warmup_end.duration_since(warmup_start); - let warmup_size = byte_range_cache.get_num_bytes(); - search_permit.warmup_completed(ByteSize(warmup_size)); - let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes(); + let warmup_size = ByteSize(byte_range_cache.get_num_bytes()); + if warmup_size > search_permit.memory_allocation() { + warn!( + memory_usage = ?warmup_size, + memory_allocation = ?search_permit.memory_allocation(), + "current leaf search is consuming more memory than the initial allocation" + ); + } + crate::SEARCH_METRICS + .leaf_search_single_split_warmup_num_bytes + .observe(warmup_size.as_u64() as f64); + search_permit.update_memory_usage(warmup_size); + search_permit.free_warmup_slot(); + let split_num_docs = split.num_docs; let span = info_span!("tantivy_search"); @@ -458,7 +490,7 @@ async fn leaf_search_single_split( collector.update_search_param(&search_request); let mut leaf_search_response: LeafSearchResponse = if is_metadata_count_request_with_ast(&query_ast, &search_request) { - get_leaf_resp_from_count(searcher.num_docs() as u64) + get_leaf_resp_from_count(searcher.num_docs()) } else if collector.is_count_only() { let count = query.count(&searcher)? as u64; get_leaf_resp_from_count(count) @@ -467,7 +499,7 @@ async fn leaf_search_single_split( }; leaf_search_response.resource_stats = Some(ResourceStats { cpu_microsecs: cpu_start.elapsed().as_micros() as u64, - short_lived_cache_num_bytes, + short_lived_cache_num_bytes: warmup_size.as_u64(), split_num_docs, warmup_microsecs: warmup_duration.as_micros() as u64, cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros() @@ -1285,7 +1317,11 @@ pub async fn leaf_search( // do no interleave with other leaf search requests. let permit_futures = searcher_context .search_permit_provider - .get_permits(split_with_req.len()) + .get_permits( + split_with_req + .iter() + .map(|(split, _)| ByteSize(split.split_footer_start)), + ) .await; for ((split, mut request), permit_fut) in @@ -1441,6 +1477,15 @@ async fn leaf_search_single_split_wrapper( mod tests { use std::ops::Bound; + use bytes::BufMut; + use quickwit_directories::write_hotcache; + use rand::{thread_rng, Rng}; + use tantivy::directory::RamDirectory; + use tantivy::schema::{ + BytesOptions, FieldEntry, Schema, TextFieldIndexing, TextOptions, Value, + }; + use tantivy::TantivyDocument; + use super::*; fn bool_filter(ast: impl Into) -> QueryAst { @@ -1876,4 +1921,97 @@ mod tests { assert_eq!(rewrote_bounds_agg, no_bounds_agg); } } + + fn create_tantivy_dir_with_hotcache<'a, V>( + field_entry: FieldEntry, + field_value: V, + ) -> (HotDirectory, usize) + where + V: Value<'a>, + { + let field_name = field_entry.name().to_string(); + let mut schema_builder = Schema::builder(); + schema_builder.add_field(field_entry); + let schema = schema_builder.build(); + + let ram_directory = RamDirectory::create(); + let index = Index::open_or_create(ram_directory.clone(), schema.clone()).unwrap(); + + let mut index_writer = index.writer(15_000_000).unwrap(); + let field = schema.get_field(&field_name).unwrap(); + let mut new_doc = TantivyDocument::default(); + new_doc.add_field_value(field, field_value); + index_writer.add_document(new_doc).unwrap(); + index_writer.commit().unwrap(); + + let mut hotcache_bytes_writer = Vec::new().writer(); + write_hotcache(ram_directory.clone(), &mut hotcache_bytes_writer).unwrap(); + let hotcache_bytes = OwnedBytes::new(hotcache_bytes_writer.into_inner()); + let hot_directory = HotDirectory::open(ram_directory.clone(), hotcache_bytes).unwrap(); + (hot_directory, ram_directory.total_mem_usage()) + } + + #[test] + fn test_compute_index_size_without_store() { + // We don't want to make assertions on absolute index sizes (it might + // change in future Tantivy versions), but rather verify that the store + // is properly excluded from the computed size. + + // We use random bytes so that the store can't compress them + let mut payload = vec![0u8; 1024]; + thread_rng().fill(&mut payload[..]); + + let (hotcache_directory_stored_payload, directory_size_stored_payload) = + create_tantivy_dir_with_hotcache( + FieldEntry::new_bytes("payload".to_string(), BytesOptions::default().set_stored()), + &payload, + ); + let size_with_stored_payload = + compute_index_size(&hotcache_directory_stored_payload).as_u64(); + + let (hotcache_directory_index_only, directory_size_index_only) = + create_tantivy_dir_with_hotcache( + FieldEntry::new_bytes("payload".to_string(), BytesOptions::default()), + &payload, + ); + let size_index_only = compute_index_size(&hotcache_directory_index_only).as_u64(); + + assert!(directory_size_stored_payload > directory_size_index_only + 1000); + assert!(size_with_stored_payload.abs_diff(size_index_only) < 10); + } + + #[test] + fn test_compute_index_size_varies_with_data() { + // We don't want to make assertions on absolute index sizes (it might + // change in future Tantivy versions), but rather verify that an index + // with more data is indeed bigger. + + let indexing_options = + TextOptions::default().set_indexing_options(TextFieldIndexing::default()); + + let (hotcache_directory_larger, directory_size_larger) = create_tantivy_dir_with_hotcache( + FieldEntry::new_text("text".to_string(), indexing_options.clone()), + "Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium \ + doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore \ + veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam \ + voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur \ + magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, \ + qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non \ + numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat \ + voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis \ + suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum \ + iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, \ + vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?", + ); + let larger_size = compute_index_size(&hotcache_directory_larger).as_u64(); + + let (hotcache_directory_smaller, directory_size_smaller) = create_tantivy_dir_with_hotcache( + FieldEntry::new_text("text".to_string(), indexing_options), + "hi", + ); + let smaller_size = compute_index_size(&hotcache_directory_smaller).as_u64(); + + assert!(directory_size_larger > directory_size_smaller + 100); + assert!(larger_size > smaller_size + 100); + } } diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index f241a162303..3b204273b4a 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -22,6 +22,7 @@ use std::ops::Bound; use std::sync::Arc; use anyhow::Context; +use bytesize::ByteSize; use futures::future::try_join_all; use itertools::{Either, Itertools}; use quickwit_common::pretty::PrettySample; @@ -218,7 +219,7 @@ async fn leaf_list_terms_single_split( ) -> crate::Result { let cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); - let index = + let (index, _) = open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; let split_schema = index.schema(); let reader = index @@ -330,7 +331,11 @@ pub async fn leaf_list_terms( info!(split_offsets = ?PrettySample::new(splits, 5)); let permits = searcher_context .search_permit_provider - .get_permits(splits.len()) + .get_permits( + splits + .iter() + .map(|split| ByteSize(split.split_footer_start)), + ) .await; let leaf_search_single_split_futures: Vec<_> = splits .iter() diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index bc22c1a8336..913dc73046b 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -27,7 +27,6 @@ use quickwit_common::metrics::GaugeGuard; #[cfg(test)] use tokio::sync::watch; use tokio::sync::{mpsc, oneshot}; -use tracing::warn; /// Distributor of permits to perform split search operation. /// @@ -41,20 +40,22 @@ pub struct SearchPermitProvider { message_sender: mpsc::UnboundedSender, #[cfg(test)] actor_stopped: watch::Receiver, + per_permit_initial_memory_allocation: u64, } #[derive(Debug)] pub enum SearchPermitMessage { Request { permit_sender: oneshot::Sender>, - num_permits: usize, + permit_sizes: Vec, }, - WarmupCompleted { + UpdateMemory { memory_delta: i64, }, + FreeWarmupSlot, Drop { memory_size: u64, - warmup_permit_held: bool, + warmup_slot_freed: bool, }, } @@ -74,7 +75,6 @@ impl SearchPermitProvider { total_memory_budget: memory_budget.as_u64(), permits_requests: VecDeque::new(), total_memory_allocated: 0u64, - per_permit_initial_memory_allocation: initial_allocation.as_u64(), #[cfg(test)] stopped: state_sender, }; @@ -83,21 +83,31 @@ impl SearchPermitProvider { message_sender, #[cfg(test)] actor_stopped: state_receiver, + per_permit_initial_memory_allocation: initial_allocation.as_u64(), } } - /// Returns `num_permits` futures that complete once enough resources are - /// available. + /// Returns one permit future for each provided split size. /// /// The permits returned are guaranteed to be resolved in order. In /// addition, the permits are guaranteed to be resolved before permits /// returned by subsequent calls to this function. - pub async fn get_permits(&self, num_permits: usize) -> Vec { + /// + /// The permit memory size is capped by per_permit_initial_memory_allocation. + pub async fn get_permits( + &self, + split_sizes: impl IntoIterator, + ) -> Vec { let (permit_sender, permit_receiver) = oneshot::channel(); self.message_sender .send(SearchPermitMessage::Request { permit_sender, - num_permits, + permit_sizes: split_sizes + .into_iter() + .map(|size| { + std::cmp::min(size.as_u64(), self.per_permit_initial_memory_allocation) + }) + .collect(), }) .expect("Receiver lives longer than sender"); permit_receiver @@ -115,8 +125,7 @@ struct SearchPermitActor { /// When it happens, new permits will not be assigned until the memory is freed. total_memory_budget: u64, total_memory_allocated: u64, - per_permit_initial_memory_allocation: u64, - permits_requests: VecDeque>, + permits_requests: VecDeque<(oneshot::Sender, u64)>, #[cfg(test)] stopped: watch::Sender, } @@ -134,13 +143,13 @@ impl SearchPermitActor { fn handle_message(&mut self, msg: SearchPermitMessage) { match msg { SearchPermitMessage::Request { - num_permits, + permit_sizes, permit_sender, } => { - let mut permits = Vec::with_capacity(num_permits); - for _ in 0..num_permits { + let mut permits = Vec::with_capacity(permit_sizes.len()); + for permit_size in permit_sizes { let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); + self.permits_requests.push_back((tx, permit_size)); permits.push(SearchPermitFuture(rx)); } self.assign_available_permits(); @@ -149,8 +158,7 @@ impl SearchPermitActor { // This is a request response pattern, so we can safely ignore the error. .expect("Receiver lives longer than sender"); } - SearchPermitMessage::WarmupCompleted { memory_delta } => { - self.num_warmup_slots_available += 1; + SearchPermitMessage::UpdateMemory { memory_delta } => { if self.total_memory_allocated as i64 + memory_delta < 0 { panic!("More memory released than allocated, should never happen.") } @@ -158,11 +166,15 @@ impl SearchPermitActor { (self.total_memory_allocated as i64 + memory_delta) as u64; self.assign_available_permits(); } + SearchPermitMessage::FreeWarmupSlot => { + self.num_warmup_slots_available += 1; + self.assign_available_permits(); + } SearchPermitMessage::Drop { memory_size, - warmup_permit_held, + warmup_slot_freed, } => { - if warmup_permit_held { + if !warmup_slot_freed { self.num_warmup_slots_available += 1; } self.total_memory_allocated = self @@ -174,26 +186,34 @@ impl SearchPermitActor { } } + fn pop_next_request_if_serviceable(&mut self) -> Option<(oneshot::Sender, u64)> { + if self.num_warmup_slots_available == 0 { + return None; + } + if let Some((_, next_permit_size)) = self.permits_requests.front() { + if self.total_memory_allocated + next_permit_size < self.total_memory_budget { + return self.permits_requests.pop_front(); + } + } + None + } + fn assign_available_permits(&mut self) { - while self.num_warmup_slots_available > 0 - && self.total_memory_allocated + self.per_permit_initial_memory_allocation - <= self.total_memory_budget + while let Some((permit_requester_tx, next_permit_size)) = + self.pop_next_request_if_serviceable() { - let Some(permit_requester_tx) = self.permits_requests.pop_front() else { - break; - }; let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - self.total_memory_allocated += self.per_permit_initial_memory_allocation; + self.total_memory_allocated += next_permit_size; self.num_warmup_slots_available -= 1; permit_requester_tx .send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, msg_sender: self.msg_sender.clone(), - memory_allocation: self.per_permit_initial_memory_allocation, - warmup_permit_held: true, + memory_allocation: next_permit_size, + warmup_slot_freed: false, }) // if the requester dropped its receiver, we drop the newly // created SearchPermit which releases the resources @@ -210,29 +230,32 @@ pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, msg_sender: mpsc::WeakUnboundedSender, memory_allocation: u64, - warmup_permit_held: bool, + warmup_slot_freed: bool, } impl SearchPermit { - /// After warm up, we have a proper estimate of the memory usage of a single - /// split leaf search. We can thus set the actual memory usage and release - /// the warmup slot. - pub fn warmup_completed(&mut self, new_memory_usage: ByteSize) { + /// Update the memory usage attached to this permit. + /// + /// This will increase or decrease the available memory in the [`SearchPermitProvider`]. + pub fn update_memory_usage(&mut self, new_memory_usage: ByteSize) { let new_usage_bytes = new_memory_usage.as_u64(); - crate::SEARCH_METRICS - .leaf_search_single_split_warmup_num_bytes - .observe(new_usage_bytes as f64); - if new_usage_bytes > self.memory_allocation { - warn!( - memory_usage = new_usage_bytes, - memory_allocation = self.memory_allocation, - "current leaf search is consuming more memory than the initial allocation" - ); - } let memory_delta = new_usage_bytes as i64 - self.memory_allocation as i64; - self.warmup_permit_held = false; self.memory_allocation = new_usage_bytes; - self.send_if_still_running(SearchPermitMessage::WarmupCompleted { memory_delta }); + self.send_if_still_running(SearchPermitMessage::UpdateMemory { memory_delta }); + } + + /// Drop the warmup permit, allowing more downloads to be started. Only one + /// slot is attached to each permit so calling this again has no effect. + pub fn free_warmup_slot(&mut self) { + if self.warmup_slot_freed { + return; + } + self.warmup_slot_freed = true; + self.send_if_still_running(SearchPermitMessage::FreeWarmupSlot); + } + + pub fn memory_allocation(&self) -> ByteSize { + ByteSize(self.memory_allocation) } fn send_if_still_running(&self, msg: SearchPermitMessage) { @@ -250,7 +273,7 @@ impl Drop for SearchPermit { fn drop(&mut self) { self.send_if_still_running(SearchPermitMessage::Drop { memory_size: self.memory_allocation, - warmup_permit_held: self.warmup_permit_held, + warmup_slot_freed: self.warmup_slot_freed, }); } } @@ -273,6 +296,7 @@ impl Future for SearchPermitFuture { #[cfg(test)] mod tests { + use std::iter::repeat; use std::time::Duration; use futures::StreamExt; @@ -285,7 +309,9 @@ mod tests { async fn test_search_permit_order() { let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); let mut all_futures = Vec::new(); - let first_batch_of_permits = permit_provider.get_permits(10).await; + let first_batch_of_permits = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(10)) + .await; assert_eq!(first_batch_of_permits.len(), 10); all_futures.extend( first_batch_of_permits @@ -294,7 +320,9 @@ mod tests { .map(move |(i, fut)| ((1, i), fut)), ); - let second_batch_of_permits = permit_provider.get_permits(10).await; + let second_batch_of_permits = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(10)) + .await; assert_eq!(second_batch_of_permits.len(), 10); all_futures.extend( second_batch_of_permits @@ -331,13 +359,13 @@ mod tests { async fn test_search_permit_early_drops() { let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); let permit_fut1 = permit_provider - .get_permits(1) + .get_permits(vec![ByteSize::mb(10)]) .await .into_iter() .next() .unwrap(); let permit_fut2 = permit_provider - .get_permits(1) + .get_permits([ByteSize::mb(10)]) .await .into_iter() .next() @@ -348,7 +376,7 @@ mod tests { assert_eq!(*permit_provider.actor_stopped.borrow(), false); let _permit_fut3 = permit_provider - .get_permits(1) + .get_permits([ByteSize::mb(10)]) .await .into_iter() .next() @@ -369,9 +397,11 @@ mod tests { } #[tokio::test] - async fn test_memory_permit() { + async fn test_memory_budget() { let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100), ByteSize::mb(10)); - let mut permit_futs = permit_provider.get_permits(14).await; + let mut permit_futs = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(14)) + .await; let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); assert_eq!(remaining_permit_futs.len(), 4); // we should be able to obtain 10 permits right away (100MB / 10MB) @@ -390,9 +420,47 @@ mod tests { let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); try_get(next_blocked_permit_fut).await.unwrap_err(); // by setting a more accurate memory usage after a completed warmup, we can get more permits - permits[0].warmup_completed(ByteSize::mb(4)); - permits[1].warmup_completed(ByteSize::mb(6)); + permits[0].update_memory_usage(ByteSize::mb(4)); + permits[1].update_memory_usage(ByteSize::mb(6)); let next_permit_fut = remaining_permit_futs.next().unwrap(); try_get(next_permit_fut).await.unwrap(); } + + #[tokio::test] + async fn test_warmup_slot() { + let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100), ByteSize::mb(1)); + let mut permit_futs = permit_provider + // permit sizes are capped by per_permit_initial_memory_allocation + .get_permits(repeat(ByteSize::mb(100)).take(16)) + .await; + let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); + assert_eq!(remaining_permit_futs.len(), 6); + // we should be able to obtain 10 permits right away + let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) + .buffered(1) + .collect() + .await; + // the next permit is blocked by the warmup slots + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // if we drop one of the permits, we can get a new one + permits.drain(0..1); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_permit_fut).await.unwrap()); + // the next permit is blocked again by the warmup slots + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // we can explicitly free the warmup slot on a permit + permits[0].free_warmup_slot(); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_permit_fut).await.unwrap()); + // dropping that same permit does not free up another slot + permits.drain(0..1); + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // but dropping a permit for which the slot wasn't explicitly free does free up a slot + permits.drain(0..1); + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_blocked_permit_fut).await.unwrap()); + } } diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index d8e4e3b6536..0659965b40d 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -130,7 +130,7 @@ async fn leaf_search_stream_single_split( let cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); - let index = open_index_with_caches( + let (index, _) = open_index_with_caches( &searcher_context, storage, &split,