Skip to content

Commit

Permalink
Limit permit memory size with split size
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Dec 4, 2024
1 parent c9f34df commit fd13536
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 114 deletions.
35 changes: 8 additions & 27 deletions quickwit/quickwit-directories/src/hot_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,6 @@ impl StaticDirectoryCache {
pub fn get_file_length(&self, path: &Path) -> Option<u64> {
self.file_lengths.get(path).copied()
}

/// return the files and their cached lengths
pub fn get_stats(&self) -> Vec<(PathBuf, usize)> {
let mut entries = self
.slices
.iter()
.map(|(path, cache)| (path.to_owned(), cache.len()))
.collect::<Vec<_>>();

entries.sort_by_key(|el| el.0.to_owned());
entries
}
}

/// A SliceCache is a static toring
Expand Down Expand Up @@ -265,10 +253,6 @@ impl StaticSliceCache {
}
None
}

pub fn len(&self) -> usize {
self.bytes.len()
}
}

struct StaticSliceCacheBuilder {
Expand Down Expand Up @@ -376,12 +360,14 @@ impl HotDirectory {
}),
})
}
/// Get files and their cached sizes.
pub fn get_stats_per_file(
hot_cache_bytes: OwnedBytes,
) -> anyhow::Result<Vec<(PathBuf, usize)>> {
let static_cache = StaticDirectoryCache::open(hot_cache_bytes)?;
Ok(static_cache.get_stats())
/// Get all the directory files and their sizes.
pub fn get_file_sizes(&self) -> Vec<(PathBuf, u64)> {
self.inner
.cache
.file_lengths
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect()
}
}

Expand Down Expand Up @@ -704,11 +690,6 @@ mod tests {
assert_eq!(directory_cache.get_file_length(three_path), Some(300));
assert_eq!(directory_cache.get_file_length(four_path), None);

let stats = directory_cache.get_stats();
assert_eq!(stats[0], (one_path.to_owned(), 8));
assert_eq!(stats[1], (three_path.to_owned(), 0));
assert_eq!(stats[2], (two_path.to_owned(), 7));

assert_eq!(
directory_cache
.get_slice(one_path)
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn fetch_docs_in_split(
global_doc_addrs.sort_by_key(|doc| doc.doc_addr);
// Opens the index without the ephemeral unbounded cache, this cache is indeed not useful
// when fetching docs as we will fetch them only once.
let mut index = open_index_with_caches(
let (mut index, _) = open_index_with_caches(
&searcher_context,
index_storage,
split,
Expand Down
194 changes: 166 additions & 28 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,34 +126,39 @@ pub(crate) async fn open_split_bundle(
Ok((hotcache_bytes, bundle_storage))
}

/// Add a storage proxy to retry `get_slice` requests if they are taking too long,
/// if configured in the searcher config.
///
/// The goal here is too ensure a low latency.
fn configure_storage_retries(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
) -> Arc<dyn Storage> {
if let Some(storage_timeout_policy) = &searcher_context.searcher_config.storage_timeout_policy {
Arc::new(TimeoutAndRetryStorage::new(
index_storage,
storage_timeout_policy.clone(),
))
} else {
index_storage
}
}

/// Opens a `tantivy::Index` for the given split with several cache layers:
/// - A split footer cache given by `SearcherContext.split_footer_cache`.
/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`.
/// - An ephemeral unbounded cache directory whose lifetime is tied to the
/// returned `Index`.
/// - An ephemeral unbounded cache directory (whose lifetime is tied to the
/// returned `Index` if no `ByteRangeCache` is provided).
#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))]
pub(crate) async fn open_index_with_caches(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: Option<ByteRangeCache>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
//
// The goal here is too ensure a low latency.

let index_storage_with_retry_on_timeout = if let Some(storage_timeout_policy) =
&searcher_context.searcher_config.storage_timeout_policy
{
Arc::new(TimeoutAndRetryStorage::new(
index_storage,
storage_timeout_policy.clone(),
))
} else {
index_storage
};
) -> anyhow::Result<(Index, HotDirectory)> {
let index_storage_with_retry_on_timeout =
configure_storage_retries(searcher_context, index_storage);

let (hotcache_bytes, bundle_storage) = open_split_bundle(
searcher_context,
Expand All @@ -176,7 +181,7 @@ pub(crate) async fn open_index_with_caches(
HotDirectory::open(directory, hotcache_bytes.read_bytes()?)?
};

let mut index = Index::open(hot_directory)?;
let mut index = Index::open(hot_directory.clone())?;
if let Some(tokenizer_manager) = tokenizer_manager {
index.set_tokenizers(tokenizer_manager.tantivy_manager().clone());
}
Expand All @@ -185,7 +190,7 @@ pub(crate) async fn open_index_with_caches(
.tantivy_manager()
.clone(),
);
Ok(index)
Ok((index, hot_directory))
}

/// Tantivy search does not make it possible to fetch data asynchronously during
Expand Down Expand Up @@ -370,6 +375,17 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
}
}

/// Compute the size of the index, store excluded.
fn compute_index_size(hot_directory: &HotDirectory) -> ByteSize {
let size_bytes = hot_directory
.get_file_sizes()
.iter()
.filter(|(path, _)| !path.to_string_lossy().ends_with("store"))
.map(|(_, size)| *size)
.sum();
ByteSize(size_bytes)
}

/// Apply a leaf search on a single split.
#[allow(clippy::too_many_arguments)]
async fn leaf_search_single_split(
Expand Down Expand Up @@ -408,15 +424,19 @@ async fn leaf_search_single_split(
let split_id = split.split_id.to_string();
let byte_range_cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index = open_index_with_caches(
let (index, hot_directory) = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();

let index_size = compute_index_size(&hot_directory);
if index_size < search_permit.memory_allocation() {
search_permit.update_memory_usage(index_size);
}

let reader = index
.reader_builder()
Expand All @@ -427,6 +447,7 @@ async fn leaf_search_single_split(
let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;

let split_schema = index.schema();
let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

let collector_warmup_info = collector.warmup_info();
Expand All @@ -437,9 +458,20 @@ async fn leaf_search_single_split(
warmup(&searcher, &warmup_info).await?;
let warmup_end = Instant::now();
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);
let warmup_size = byte_range_cache.get_num_bytes();
search_permit.warmup_completed(ByteSize(warmup_size));
let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes();
let warmup_size = ByteSize(byte_range_cache.get_num_bytes());
if warmup_size > search_permit.memory_allocation() {
warn!(
memory_usage = ?warmup_size,
memory_allocation = ?search_permit.memory_allocation(),
"current leaf search is consuming more memory than the initial allocation"
);
}
crate::SEARCH_METRICS
.leaf_search_single_split_warmup_num_bytes
.observe(warmup_size.as_u64() as f64);
search_permit.update_memory_usage(warmup_size);
search_permit.free_warmup_slot();

let split_num_docs = split.num_docs;

let span = info_span!("tantivy_search");
Expand All @@ -458,7 +490,7 @@ async fn leaf_search_single_split(
collector.update_search_param(&search_request);
let mut leaf_search_response: LeafSearchResponse =
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
get_leaf_resp_from_count(searcher.num_docs() as u64)
get_leaf_resp_from_count(searcher.num_docs())
} else if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
get_leaf_resp_from_count(count)
Expand All @@ -467,7 +499,7 @@ async fn leaf_search_single_split(
};
leaf_search_response.resource_stats = Some(ResourceStats {
cpu_microsecs: cpu_start.elapsed().as_micros() as u64,
short_lived_cache_num_bytes,
short_lived_cache_num_bytes: warmup_size.as_u64(),
split_num_docs,
warmup_microsecs: warmup_duration.as_micros() as u64,
cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros()
Expand Down Expand Up @@ -1285,7 +1317,11 @@ pub async fn leaf_search(
// do no interleave with other leaf search requests.
let permit_futures = searcher_context
.search_permit_provider
.get_permits(split_with_req.len())
.get_permits(
split_with_req
.iter()
.map(|(split, _)| ByteSize(split.split_footer_start)),
)
.await;

for ((split, mut request), permit_fut) in
Expand Down Expand Up @@ -1441,6 +1477,15 @@ async fn leaf_search_single_split_wrapper(
mod tests {
use std::ops::Bound;

use bytes::BufMut;
use quickwit_directories::write_hotcache;
use rand::{thread_rng, Rng};
use tantivy::directory::RamDirectory;
use tantivy::schema::{
BytesOptions, FieldEntry, Schema, TextFieldIndexing, TextOptions, Value,
};
use tantivy::TantivyDocument;

use super::*;

fn bool_filter(ast: impl Into<QueryAst>) -> QueryAst {
Expand Down Expand Up @@ -1876,4 +1921,97 @@ mod tests {
assert_eq!(rewrote_bounds_agg, no_bounds_agg);
}
}

fn create_tantivy_dir_with_hotcache<'a, V>(
field_entry: FieldEntry,
field_value: V,
) -> (HotDirectory, usize)
where
V: Value<'a>,
{
let field_name = field_entry.name().to_string();
let mut schema_builder = Schema::builder();
schema_builder.add_field(field_entry);
let schema = schema_builder.build();

let ram_directory = RamDirectory::create();
let index = Index::open_or_create(ram_directory.clone(), schema.clone()).unwrap();

let mut index_writer = index.writer(15_000_000).unwrap();
let field = schema.get_field(&field_name).unwrap();
let mut new_doc = TantivyDocument::default();
new_doc.add_field_value(field, field_value);
index_writer.add_document(new_doc).unwrap();
index_writer.commit().unwrap();

let mut hotcache_bytes_writer = Vec::new().writer();
write_hotcache(ram_directory.clone(), &mut hotcache_bytes_writer).unwrap();
let hotcache_bytes = OwnedBytes::new(hotcache_bytes_writer.into_inner());
let hot_directory = HotDirectory::open(ram_directory.clone(), hotcache_bytes).unwrap();
(hot_directory, ram_directory.total_mem_usage())
}

#[test]
fn test_compute_index_size_without_store() {
// We don't want to make assertions on absolute index sizes (it might
// change in future Tantivy versions), but rather verify that the store
// is properly excluded from the computed size.

// We use random bytes so that the store can't compress them
let mut payload = vec![0u8; 1024];
thread_rng().fill(&mut payload[..]);

let (hotcache_directory_stored_payload, directory_size_stored_payload) =
create_tantivy_dir_with_hotcache(
FieldEntry::new_bytes("payload".to_string(), BytesOptions::default().set_stored()),
&payload,
);
let size_with_stored_payload =
compute_index_size(&hotcache_directory_stored_payload).as_u64();

let (hotcache_directory_index_only, directory_size_index_only) =
create_tantivy_dir_with_hotcache(
FieldEntry::new_bytes("payload".to_string(), BytesOptions::default()),
&payload,
);
let size_index_only = compute_index_size(&hotcache_directory_index_only).as_u64();

assert!(directory_size_stored_payload > directory_size_index_only + 1000);
assert!(size_with_stored_payload.abs_diff(size_index_only) < 10);
}

#[test]
fn test_compute_index_size_varies_with_data() {
// We don't want to make assertions on absolute index sizes (it might
// change in future Tantivy versions), but rather verify that an index
// with more data is indeed bigger.

let indexing_options =
TextOptions::default().set_indexing_options(TextFieldIndexing::default());

let (hotcache_directory_larger, directory_size_larger) = create_tantivy_dir_with_hotcache(
FieldEntry::new_text("text".to_string(), indexing_options.clone()),
"Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium \
doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore \
veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam \
voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur \
magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, \
qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non \
numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat \
voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis \
suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum \
iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, \
vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?",
);
let larger_size = compute_index_size(&hotcache_directory_larger).as_u64();

let (hotcache_directory_smaller, directory_size_smaller) = create_tantivy_dir_with_hotcache(
FieldEntry::new_text("text".to_string(), indexing_options),
"hi",
);
let smaller_size = compute_index_size(&hotcache_directory_smaller).as_u64();

assert!(directory_size_larger > directory_size_smaller + 100);
assert!(larger_size > smaller_size + 100);
}
}
9 changes: 7 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::ops::Bound;
use std::sync::Arc;

use anyhow::Context;
use bytesize::ByteSize;
use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::pretty::PrettySample;
Expand Down Expand Up @@ -218,7 +219,7 @@ async fn leaf_list_terms_single_split(
) -> crate::Result<LeafListTermsResponse> {
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index =
let (index, _) =
open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?;
let split_schema = index.schema();
let reader = index
Expand Down Expand Up @@ -330,7 +331,11 @@ pub async fn leaf_list_terms(
info!(split_offsets = ?PrettySample::new(splits, 5));
let permits = searcher_context
.search_permit_provider
.get_permits(splits.len())
.get_permits(
splits
.iter()
.map(|split| ByteSize(split.split_footer_start)),
)
.await;
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
Expand Down
Loading

0 comments on commit fd13536

Please sign in to comment.