Skip to content

Commit

Permalink
don't enumerate index_uid when requesting splits to gc (#5489)
Browse files Browse the repository at this point in the history
* don't enumerate index when listing splits for gc

* add tests for get splits of all indexes

* no-op minor refactoring

* Explicitly post filtering splits

no-op refactoring.

* CR comments

---------

Co-authored-by: Paul Masurel <[email protected]>
  • Loading branch information
trinity-1686a and fulmicoton authored Oct 18, 2024
1 parent a2c5f1d commit 6b4acd0
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 114 deletions.
78 changes: 42 additions & 36 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::{Future, StreamExt};
use itertools::Itertools;
use quickwit_common::metrics::IntCounter;
use quickwit_common::pretty::PrettySample;
use quickwit_common::Progress;
use quickwit_common::{rate_limited_info, Progress};
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo,
SplitMetadata, SplitState,
Expand Down Expand Up @@ -122,8 +122,8 @@ pub async fn run_garbage_collect(

let index_uids: Vec<IndexUid> = indexes.keys().cloned().collect();

let Some(list_splits_query_for_index_uids) =
ListSplitsQuery::try_from_index_uids(index_uids.clone())
// TODO maybe we want to do a ListSplitsQuery::for_all_indexes and post-filter ourselves here
let Some(list_splits_query_for_index_uids) = ListSplitsQuery::try_from_index_uids(index_uids)
else {
return Ok(SplitRemovalInfo::default());
};
Expand Down Expand Up @@ -187,7 +187,6 @@ pub async fn run_garbage_collect(
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;

Ok(delete_splits_marked_for_deletion_several_indexes(
index_uids,
updated_before_timestamp,
metastore,
indexes,
Expand Down Expand Up @@ -221,20 +220,15 @@ async fn delete_splits(
)
.await
} else {
error!(
"we are trying to GC without knowing the storage, this shouldn't \
happen"
// in practice this can happen if the index was created between the start of
// the run and now, and one of its splits has already expired, which likely
// means a very long gc run, or if we run gc on a single index from the cli.
quickwit_common::rate_limited_warn!(
limit_per_min = 2,
index_uid=%index_uid,
"we are trying to GC without knowing the storage",
);
Err(DeleteSplitsError {
successes: Vec::new(),
storage_error: None,
storage_failures: splits_metadata_to_delete
.into_iter()
.map(|split| split.as_split_info())
.collect(),
metastore_error: None,
metastore_failures: Vec::new(),
})
Ok(Vec::new())
}
}
})
Expand Down Expand Up @@ -304,11 +298,12 @@ async fn list_splits_metadata(
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// Only splits from index_uids in the `storages` map will be deleted.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))]
#[instrument(skip(storages, metastore, progress_opt, metrics), fields(num_indexes=%storages.len()))]
async fn delete_splits_marked_for_deletion_several_indexes(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
Expand All @@ -317,18 +312,22 @@ async fn delete_splits_marked_for_deletion_several_indexes(
) -> SplitRemovalInfo {
let mut split_removal_info = SplitRemovalInfo::default();

let Some(list_splits_query) = ListSplitsQuery::try_from_index_uids(index_uids) else {
error!("failed to create list splits query. this should never happen");
return split_removal_info;
};
// we ask for all indexes because the query is more efficient and we almost always want all
// indexes anyway. The exception is when garbage collecting a single index from the commandline.
// In this case, we will log a bunch of warn. i (trinity) consider it worth the more generic
// code which needs fewer special case while testing, but we could check index_uids len if we
// think it's a better idea.
let list_splits_query = ListSplitsQuery::for_all_indexes();

let mut list_splits_query = list_splits_query
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
.sort_by_index_uid();

loop {
let mut splits_to_delete_possibly_remaining = true;

while splits_to_delete_possibly_remaining {
let splits_metadata_to_delete: Vec<SplitMetadata> = match protect_future(
progress_opt,
list_splits_metadata(&metastore, &list_splits_query),
Expand All @@ -342,19 +341,32 @@ async fn delete_splits_marked_for_deletion_several_indexes(
}
};

// We page through the list of splits to delete using a limit and a `search_after` trick.
// To detect if this is the last page, we check if the number of splits is less than the
// limit.
assert!(splits_metadata_to_delete.len() <= DELETE_SPLITS_BATCH_SIZE);
splits_to_delete_possibly_remaining =
splits_metadata_to_delete.len() == DELETE_SPLITS_BATCH_SIZE;

// set split after which to search for the next loop
let Some(last_split_metadata) = splits_metadata_to_delete.last() else {
break;
};
list_splits_query = list_splits_query.after_split(last_split_metadata);

let num_splits_to_delete = splits_metadata_to_delete.len();
let mut splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
HashMap::with_capacity(storages.len());

let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();
for meta in splits_metadata_to_delete {
if !storages.contains_key(&meta.index_uid) {
rate_limited_info!(limit_per_min=6, index_uid=?meta.index_uid, "split not listed in storage map: skipping");
continue;
}
splits_metadata_to_delete_per_index
.entry(meta.index_uid.clone())
.or_default()
.push(meta);
}

// ignore return we continue either way
let _: Result<(), ()> = delete_splits(
Expand All @@ -366,12 +378,6 @@ async fn delete_splits_marked_for_deletion_several_indexes(
&mut split_removal_info,
)
.await;

if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
// stop the gc if this was the last batch
// we are guaranteed to make progress due to .after_split()
break;
}
}

split_removal_info
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,14 +1764,14 @@ mod tests {
.expect_list_splits()
.withf(|request| {
let list_splits_query = request.deserialize_list_splits_query().unwrap();
list_splits_query.index_uids == [("test-index-0", 0)]
list_splits_query.index_uids.unwrap() == [("test-index-0", 0)]
})
.return_once(|_request| Ok(ServiceStream::empty()));
mock_metastore
.expect_list_splits()
.withf(|request| {
let list_splits_query = request.deserialize_list_splits_query().unwrap();
list_splits_query.index_uids == [("test-index-1", 0), ("test-index-2", 0)]
list_splits_query.index_uids.unwrap() == [("test-index-1", 0), ("test-index-2", 0)]
})
.return_once(|_request| {
let splits = vec![Split {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ mod tests {
.times(1)
.withf(move |list_splits_request| {
let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(list_split_query.index_uids, &[index_uid.clone()]);
assert_eq!(list_split_query.index_uids, Some(vec![index_uid.clone()]));
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
Expand Down
45 changes: 29 additions & 16 deletions quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,13 @@ mod tests {
.times(2)
.returning(move |list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.index_uids[0], index_uid_clone,);
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::Staged => {
assert_eq!(query.index_uids.unwrap()[0], index_uid_clone);
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
let expected_deletion_timestamp = OffsetDateTime::now_utc()
.unix_timestamp()
- split_deletion_grace_period().as_secs() as i64;
Expand Down Expand Up @@ -394,14 +397,19 @@ mod tests {
.times(2)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(&query.index_uids[0].index_id, "test-index");
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::MarkedForDeletion => make_splits(
"test-index",
&["a", "b", "c"],
SplitState::MarkedForDeletion,
),
SplitState::Staged => {
assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index");
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
make_splits(
"test-index",
&["a", "b", "c"],
SplitState::MarkedForDeletion,
)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
};
let splits = ListSplitsResponse::try_from_splits(splits).unwrap();
Expand Down Expand Up @@ -469,10 +477,13 @@ mod tests {
.times(6)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(&query.index_uids[0].index_id, "test-index");
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::Staged => {
assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index");
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(&query.index_uids.is_none());
make_splits("test-index", &["a", "b"], SplitState::MarkedForDeletion)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
Expand Down Expand Up @@ -633,11 +644,6 @@ mod tests {
.times(3)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.index_uids.len(), 2);
assert!(["test-index-1", "test-index-2"]
.contains(&query.index_uids[0].index_id.as_ref()));
assert!(["test-index-1", "test-index-2"]
.contains(&query.index_uids[1].index_id.as_ref()));
let splits_ids_string: Vec<String> =
(0..8000).map(|seq| format!("split-{seq:04}")).collect();
let splits_ids: Vec<&str> = splits_ids_string
Expand All @@ -646,11 +652,18 @@ mod tests {
.collect();
let mut splits = match query.split_states[0] {
SplitState::Staged => {
let index_uids = query.index_uids.unwrap();
assert_eq!(index_uids.len(), 2);
assert!(["test-index-1", "test-index-2"]
.contains(&index_uids[0].index_id.as_ref()));
assert!(["test-index-1", "test-index-2"]
.contains(&index_uids[1].index_id.as_ref()));
let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged);
splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged));
splits
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
assert_eq!(query.limit, Some(10_000));
let mut splits =
make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ mod tests {
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.split_states, &[SplitState::Published]);
let splits = match query.index_uids[0].index_id.as_ref() {
let splits = match query.index_uids.unwrap()[0].index_id.as_ref() {
"index-1" => {
vec![
make_split("split-1", Some(1000..=5000)),
Expand Down
Loading

0 comments on commit 6b4acd0

Please sign in to comment.