From 6b4acd0b5ce8a007c8d2ea15291e084850318d0c Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 18 Oct 2024 03:58:47 +0200 Subject: [PATCH] don't enumerate index_uid when requesting splits to gc (#5489) * don't enumerate index when listing splits for gc * add tests for get splits of all indexes * no-op minor refactoring * Explicitly post filtering splits no-op refactoring. * CR comments --------- Co-authored-by: Paul Masurel --- .../src/garbage_collection.rs | 78 ++++---- .../src/actors/indexing_service.rs | 4 +- .../src/actors/merge_pipeline.rs | 2 +- .../src/actors/garbage_collector.rs | 45 +++-- .../src/actors/retention_policy_executor.rs | 2 +- .../src/metastore/file_backed/mod.rs | 92 +++++++--- .../quickwit-metastore/src/metastore/mod.rs | 28 ++- .../src/metastore/postgres/metastore.rs | 53 +++--- .../src/metastore/postgres/utils.rs | 14 +- .../src/tests/list_splits.rs | 169 ++++++++++++++++++ quickwit/quickwit-metastore/src/tests/mod.rs | 7 + quickwit/quickwit-search/src/root.rs | 6 +- .../src/index_api/rest_handler.rs | 6 +- 13 files changed, 392 insertions(+), 114 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 74971077b3b..7253c21570f 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -27,7 +27,7 @@ use futures::{Future, StreamExt}; use itertools::Itertools; use quickwit_common::metrics::IntCounter; use quickwit_common::pretty::PrettySample; -use quickwit_common::Progress; +use quickwit_common::{rate_limited_info, Progress}; use quickwit_metastore::{ ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState, @@ -122,8 +122,8 @@ pub async fn run_garbage_collect( let index_uids: Vec = indexes.keys().cloned().collect(); - let Some(list_splits_query_for_index_uids) = - ListSplitsQuery::try_from_index_uids(index_uids.clone()) + // TODO maybe we want to do a ListSplitsQuery::for_all_indexes and post-filter ourselves here + let Some(list_splits_query_for_index_uids) = ListSplitsQuery::try_from_index_uids(index_uids) else { return Ok(SplitRemovalInfo::default()); }; @@ -187,7 +187,6 @@ pub async fn run_garbage_collect( OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64; Ok(delete_splits_marked_for_deletion_several_indexes( - index_uids, updated_before_timestamp, metastore, indexes, @@ -221,20 +220,15 @@ async fn delete_splits( ) .await } else { - error!( - "we are trying to GC without knowing the storage, this shouldn't \ - happen" + // in practice this can happen if the index was created between the start of + // the run and now, and one of its splits has already expired, which likely + // means a very long gc run, or if we run gc on a single index from the cli. + quickwit_common::rate_limited_warn!( + limit_per_min = 2, + index_uid=%index_uid, + "we are trying to GC without knowing the storage", ); - Err(DeleteSplitsError { - successes: Vec::new(), - storage_error: None, - storage_failures: splits_metadata_to_delete - .into_iter() - .map(|split| split.as_split_info()) - .collect(), - metastore_error: None, - metastore_failures: Vec::new(), - }) + Ok(Vec::new()) } } }) @@ -304,11 +298,12 @@ async fn list_splits_metadata( /// Removes any splits marked for deletion which haven't been /// updated after `updated_before_timestamp` in batches of 1000 splits. /// +/// Only splits from index_uids in the `storages` map will be deleted. +/// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))] +#[instrument(skip(storages, metastore, progress_opt, metrics), fields(num_indexes=%storages.len()))] async fn delete_splits_marked_for_deletion_several_indexes( - index_uids: Vec, updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, @@ -317,10 +312,12 @@ async fn delete_splits_marked_for_deletion_several_indexes( ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); - let Some(list_splits_query) = ListSplitsQuery::try_from_index_uids(index_uids) else { - error!("failed to create list splits query. this should never happen"); - return split_removal_info; - }; + // we ask for all indexes because the query is more efficient and we almost always want all + // indexes anyway. The exception is when garbage collecting a single index from the commandline. + // In this case, we will log a bunch of warn. i (trinity) consider it worth the more generic + // code which needs fewer special case while testing, but we could check index_uids len if we + // think it's a better idea. + let list_splits_query = ListSplitsQuery::for_all_indexes(); let mut list_splits_query = list_splits_query .with_split_state(SplitState::MarkedForDeletion) @@ -328,7 +325,9 @@ async fn delete_splits_marked_for_deletion_several_indexes( .with_limit(DELETE_SPLITS_BATCH_SIZE) .sort_by_index_uid(); - loop { + let mut splits_to_delete_possibly_remaining = true; + + while splits_to_delete_possibly_remaining { let splits_metadata_to_delete: Vec = match protect_future( progress_opt, list_splits_metadata(&metastore, &list_splits_query), @@ -342,19 +341,32 @@ async fn delete_splits_marked_for_deletion_several_indexes( } }; + // We page through the list of splits to delete using a limit and a `search_after` trick. + // To detect if this is the last page, we check if the number of splits is less than the + // limit. + assert!(splits_metadata_to_delete.len() <= DELETE_SPLITS_BATCH_SIZE); + splits_to_delete_possibly_remaining = + splits_metadata_to_delete.len() == DELETE_SPLITS_BATCH_SIZE; + // set split after which to search for the next loop let Some(last_split_metadata) = splits_metadata_to_delete.last() else { break; }; list_splits_query = list_splits_query.after_split(last_split_metadata); - let num_splits_to_delete = splits_metadata_to_delete.len(); + let mut splits_metadata_to_delete_per_index: HashMap> = + HashMap::with_capacity(storages.len()); - let splits_metadata_to_delete_per_index: HashMap> = - splits_metadata_to_delete - .into_iter() - .map(|meta| (meta.index_uid.clone(), meta)) - .into_group_map(); + for meta in splits_metadata_to_delete { + if !storages.contains_key(&meta.index_uid) { + rate_limited_info!(limit_per_min=6, index_uid=?meta.index_uid, "split not listed in storage map: skipping"); + continue; + } + splits_metadata_to_delete_per_index + .entry(meta.index_uid.clone()) + .or_default() + .push(meta); + } // ignore return we continue either way let _: Result<(), ()> = delete_splits( @@ -366,12 +378,6 @@ async fn delete_splits_marked_for_deletion_several_indexes( &mut split_removal_info, ) .await; - - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE { - // stop the gc if this was the last batch - // we are guaranteed to make progress due to .after_split() - break; - } } split_removal_info diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 8ba8644c4af..5a180840a1e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -1764,14 +1764,14 @@ mod tests { .expect_list_splits() .withf(|request| { let list_splits_query = request.deserialize_list_splits_query().unwrap(); - list_splits_query.index_uids == [("test-index-0", 0)] + list_splits_query.index_uids.unwrap() == [("test-index-0", 0)] }) .return_once(|_request| Ok(ServiceStream::empty())); mock_metastore .expect_list_splits() .withf(|request| { let list_splits_query = request.deserialize_list_splits_query().unwrap(); - list_splits_query.index_uids == [("test-index-1", 0), ("test-index-2", 0)] + list_splits_query.index_uids.unwrap() == [("test-index-1", 0), ("test-index-2", 0)] }) .return_once(|_request| { let splits = vec![Split { diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 1261ec3b698..2bc7732de8d 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -613,7 +613,7 @@ mod tests { .times(1) .withf(move |list_splits_request| { let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap(); - assert_eq!(list_split_query.index_uids, &[index_uid.clone()]); + assert_eq!(list_split_query.index_uids, Some(vec![index_uid.clone()])); assert_eq!( list_split_query.split_states, vec![quickwit_metastore::SplitState::Published] diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index bb82e55d856..ca9a0e66eb4 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -302,10 +302,13 @@ mod tests { .times(2) .returning(move |list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); - assert_eq!(query.index_uids[0], index_uid_clone,); let splits = match query.split_states[0] { - SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged), + SplitState::Staged => { + assert_eq!(query.index_uids.unwrap()[0], index_uid_clone); + make_splits("test-index", &["a"], SplitState::Staged) + } SplitState::MarkedForDeletion => { + assert!(query.index_uids.is_none()); let expected_deletion_timestamp = OffsetDateTime::now_utc() .unix_timestamp() - split_deletion_grace_period().as_secs() as i64; @@ -394,14 +397,19 @@ mod tests { .times(2) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); - assert_eq!(&query.index_uids[0].index_id, "test-index"); let splits = match query.split_states[0] { - SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged), - SplitState::MarkedForDeletion => make_splits( - "test-index", - &["a", "b", "c"], - SplitState::MarkedForDeletion, - ), + SplitState::Staged => { + assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index"); + make_splits("test-index", &["a"], SplitState::Staged) + } + SplitState::MarkedForDeletion => { + assert!(query.index_uids.is_none()); + make_splits( + "test-index", + &["a", "b", "c"], + SplitState::MarkedForDeletion, + ) + } _ => panic!("only Staged and MarkedForDeletion expected."), }; let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); @@ -469,10 +477,13 @@ mod tests { .times(6) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); - assert_eq!(&query.index_uids[0].index_id, "test-index"); let splits = match query.split_states[0] { - SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged), + SplitState::Staged => { + assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index"); + make_splits("test-index", &["a"], SplitState::Staged) + } SplitState::MarkedForDeletion => { + assert!(&query.index_uids.is_none()); make_splits("test-index", &["a", "b"], SplitState::MarkedForDeletion) } _ => panic!("only Staged and MarkedForDeletion expected."), @@ -633,11 +644,6 @@ mod tests { .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); - assert_eq!(query.index_uids.len(), 2); - assert!(["test-index-1", "test-index-2"] - .contains(&query.index_uids[0].index_id.as_ref())); - assert!(["test-index-1", "test-index-2"] - .contains(&query.index_uids[1].index_id.as_ref())); let splits_ids_string: Vec = (0..8000).map(|seq| format!("split-{seq:04}")).collect(); let splits_ids: Vec<&str> = splits_ids_string @@ -646,11 +652,18 @@ mod tests { .collect(); let mut splits = match query.split_states[0] { SplitState::Staged => { + let index_uids = query.index_uids.unwrap(); + assert_eq!(index_uids.len(), 2); + assert!(["test-index-1", "test-index-2"] + .contains(&index_uids[0].index_id.as_ref())); + assert!(["test-index-1", "test-index-2"] + .contains(&index_uids[1].index_id.as_ref())); let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged); splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged)); splits } SplitState::MarkedForDeletion => { + assert!(query.index_uids.is_none()); assert_eq!(query.limit, Some(10_000)); let mut splits = make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion); diff --git a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs index 998ce33afbf..5aa304c76cb 100644 --- a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs +++ b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs @@ -459,7 +459,7 @@ mod tests { .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); assert_eq!(query.split_states, &[SplitState::Published]); - let splits = match query.index_uids[0].index_id.as_ref() { + let splits = match query.index_uids.unwrap()[0].index_id.as_ref() { "index-1" => { vec![ make_split("split-1", Some(1000..=5000)), diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 10bbd814949..8363551209b 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -66,6 +66,7 @@ use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; use time::OffsetDateTime; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock}; +use ulid::Ulid; use self::file_backed_index::FileBackedIndex; pub use self::file_backed_metastore_factory::FileBackedMetastoreFactory; @@ -253,20 +254,30 @@ impl FileBackedMetastore { async fn read(&self, index_uid: &IndexUid, view: F) -> MetastoreResult where F: FnOnce(&FileBackedIndex) -> MetastoreResult { - let index_id = &index_uid.index_id; - let locked_index = self.get_locked_index(index_id).await?; - if locked_index.index_uid() == index_uid { - view(&locked_index) - } else { - Err(MetastoreError::NotFound(EntityKind::Index { - index_id: index_id.to_string(), - })) - } + self.read_any( + index_uid.index_id.as_str(), + Some(index_uid.incarnation_id), + view, + ) + .await } - async fn read_any(&self, index_id: &str, view: F) -> MetastoreResult - where F: FnOnce(&FileBackedIndex) -> MetastoreResult { + /// Reads the index metadata given an `index_id`. The difference with `read` it that + /// this function does necessarily take a incarnation id, so that it is less strict. + async fn read_any( + &self, + index_id: &str, + incarnation_id_opt: Option, + view: impl FnOnce(&FileBackedIndex) -> MetastoreResult, + ) -> MetastoreResult { let locked_index = self.get_locked_index(index_id).await?; + if let Some(incarnation_id) = incarnation_id_opt { + if locked_index.index_uid().incarnation_id != incarnation_id { + return Err(MetastoreError::NotFound(EntityKind::Index { + index_id: index_id.to_string(), + })); + } + } view(&locked_index) } @@ -357,7 +368,7 @@ impl FileBackedMetastore { return Err((metastore_error, index_id_opt, index_uid_opt)); }; let index_metadata = match self - .read_any(index_id, |index| Ok(index.metadata().clone())) + .read_any(index_id, None, |index| Ok(index.metadata().clone())) .await { Ok(index_metadata) => index_metadata, @@ -376,24 +387,28 @@ impl FileBackedMetastore { Ok(index_metadata) } - /// Returns the list of splits for the given request. - /// No error is returned if any of the requested `index_uid` does not exist. - async fn list_splits_inner(&self, request: ListSplitsRequest) -> MetastoreResult> { - let list_splits_query = request.deserialize_list_splits_query()?; - let mut splits_per_index = Vec::with_capacity(list_splits_query.index_uids.len()); - for index_uid in &list_splits_query.index_uids { - let splits = match self - .read(index_uid, |index| index.list_splits(&list_splits_query)) + async fn list_splits_aux( + &self, + index_id_with_incarnation_id_opts: &[(IndexId, Option)], + list_splits_query: ListSplitsQuery, + ) -> MetastoreResult> { + let mut splits_per_index = Vec::with_capacity(index_id_with_incarnation_id_opts.len()); + for (index_id, incarnation_id_opt) in index_id_with_incarnation_id_opts { + match self + .read_any(index_id, *incarnation_id_opt, |index| { + index.list_splits(&list_splits_query) + }) .await { - Ok(splits) => splits, + Ok(splits) => { + splits_per_index.push(splits); + } Err(MetastoreError::NotFound(_)) => { // If the index does not exist, we just skip it. continue; } Err(error) => return Err(error), - }; - splits_per_index.push(splits); + } } let limit = list_splits_query.limit.unwrap_or(usize::MAX); @@ -405,9 +420,40 @@ impl FileBackedMetastore { .skip(offset) .take(limit) .collect(); + Ok(merged_results) } + /// Returns the list of splits for the given request. + /// No error is returned if any of the requested `index_uid` does not exist. + async fn list_splits_inner(&self, request: ListSplitsRequest) -> MetastoreResult> { + let mut list_splits_query = request.deserialize_list_splits_query()?; + + let index_id_incarnation_id_opts: Vec<(IndexId, Option)> = + if let Some(index_uids) = list_splits_query.index_uids.take() { + index_uids + .into_iter() + .map(|index_uid| (index_uid.index_id, Some(index_uid.incarnation_id))) + .collect() + } else { + // We do not have an explicit list of index_uids with the query, so we search for + // all indexes. + let inner_rlock_guard = self.state.read().await; + inner_rlock_guard + .indexes + .iter() + .filter_map(|(index_id, index_state)| match index_state { + LazyIndexStatus::Active(_) => Some(index_id), + _ => None, + }) + .map(|index_id| (index_id.clone(), None)) + .collect() + }; + + self.list_splits_aux(&index_id_incarnation_id_opts, list_splits_query) + .await + } + /// Helper used for testing to obtain the data associated with the given index. #[cfg(test)] async fn get_index(&self, index_uid: &IndexUid) -> MetastoreResult { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 56dc8b84abf..719fd603986 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -597,8 +597,9 @@ impl ListSplitsResponseExt for ListSplitsResponse { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] /// A query builder for listing splits within the metastore. pub struct ListSplitsQuery { - /// A non-empty list of index UIDs for which to fetch the splits. - pub index_uids: Vec, + /// A non-empty list of index UIDs for which to fetch the splits, or + /// None if we want splits from all indexes. + pub index_uids: Option>, /// A specific node ID to filter by. pub node_id: Option, @@ -677,7 +678,7 @@ impl ListSplitsQuery { /// Creates a new [`ListSplitsQuery`] for the designated index. pub fn for_index(index_uid: IndexUid) -> Self { Self { - index_uids: vec![index_uid], + index_uids: Some(vec![index_uid]), node_id: None, limit: None, offset: None, @@ -700,7 +701,7 @@ impl ListSplitsQuery { return None; } Some(Self { - index_uids, + index_uids: Some(index_uids), node_id: None, limit: None, offset: None, @@ -716,6 +717,25 @@ impl ListSplitsQuery { }) } + /// Creates a new [`ListSplitsQuery`] for all indexes. + pub fn for_all_indexes() -> Self { + Self { + index_uids: None, + node_id: None, + limit: None, + offset: None, + split_states: Vec::new(), + tags: None, + time_range: Default::default(), + delete_opstamp: Default::default(), + update_timestamp: Default::default(), + create_timestamp: Default::default(), + mature: Bound::Unbounded, + sort_by: SortBy::None, + after_split: None, + } + } + /// Selects splits produced by the specified node. pub fn with_node_id(mut self, node_id: NodeId) -> Self { self.node_id = Some(node_id); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index ce0d84468e5..242ba8d11dc 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -61,7 +61,7 @@ use super::migrator::run_migrations; use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits}; use super::pool::TrackedPool; use super::split_stream::SplitStream; -use super::utils::{append_query_filters, establish_connection}; +use super::utils::{append_query_filters_and_order_by, establish_connection}; use super::{ QW_POSTGRES_READ_ONLY_ENV_KEY, QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, @@ -869,7 +869,7 @@ impl MetastoreService for PostgresqlMetastore { let list_splits_query = request.deserialize_list_splits_query()?; let mut sql_query_builder = Query::select(); sql_query_builder.column(Asterisk).from(Splits::Table); - append_query_filters(&mut sql_query_builder, &list_splits_query); + append_query_filters_and_order_by(&mut sql_query_builder, &list_splits_query); let (sql_query, values) = sql_query_builder.build_sqlx(PostgresQueryBuilder); let pg_split_stream = SplitStream::new( @@ -1913,7 +1913,7 @@ mod tests { let index_uid = IndexUid::new_with_random_ulid("test-index"); let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), @@ -1927,7 +1927,7 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Published); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), @@ -1941,7 +1941,7 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_states([SplitState::Published, SplitState::MarkedForDeletion]); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -1953,7 +1953,7 @@ mod tests { let sql = select_statement.column(Asterisk).from(Splits::Table); let query = ListSplitsQuery::for_index(index_uid.clone()).with_update_timestamp_lt(51); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -1965,7 +1965,7 @@ mod tests { let sql = select_statement.column(Asterisk).from(Splits::Table); let query = ListSplitsQuery::for_index(index_uid.clone()).with_create_timestamp_lte(55); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -1979,7 +1979,7 @@ mod tests { let maturity_evaluation_datetime = OffsetDateTime::from_unix_timestamp(55).unwrap(); let query = ListSplitsQuery::for_index(index_uid.clone()) .retain_mature(maturity_evaluation_datetime); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), @@ -1993,7 +1993,7 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()) .retain_immature(maturity_evaluation_datetime); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2005,7 +2005,7 @@ mod tests { let sql = select_statement.column(Asterisk).from(Splits::Table); let query = ListSplitsQuery::for_index(index_uid.clone()).with_delete_opstamp_gte(4); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2017,7 +2017,7 @@ mod tests { let sql = select_statement.column(Asterisk).from(Splits::Table); let query = ListSplitsQuery::for_index(index_uid.clone()).with_time_range_start_gt(45); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2029,7 +2029,7 @@ mod tests { let sql = select_statement.column(Asterisk).from(Splits::Table); let query = ListSplitsQuery::for_index(index_uid.clone()).with_time_range_end_lt(45); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2045,7 +2045,7 @@ mod tests { is_present: false, tag: "tag-2".to_string(), }); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), @@ -2058,7 +2058,7 @@ mod tests { let sql = select_statement.column(Asterisk).from(Splits::Table); let query = ListSplitsQuery::for_index(index_uid.clone()).with_offset(4); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), @@ -2071,7 +2071,7 @@ mod tests { let sql = select_statement.column(Asterisk).from(Splits::Table); let query = ListSplitsQuery::for_index(index_uid.clone()).sort_by_index_uid(); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), @@ -2089,7 +2089,7 @@ mod tests { split_id: "my_split".to_string(), ..Default::default() }); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), @@ -2097,6 +2097,17 @@ mod tests { r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("index_uid", "split_id") > ('{index_uid}', 'my_split')"# ) ); + + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + + let query = ListSplitsQuery::for_all_indexes().with_split_state(SplitState::Staged); + append_query_filters_and_order_by(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + r#"SELECT * FROM "splits" WHERE "split_state" IN ('Staged')"# + ); } #[test] @@ -2108,7 +2119,7 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()) .with_time_range_start_gt(0) .with_time_range_end_lt(40); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2122,7 +2133,7 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()) .with_time_range_start_gt(45) .with_delete_opstamp_gt(0); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2136,7 +2147,7 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()) .with_update_timestamp_lt(51) .with_create_timestamp_lte(63); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2153,7 +2164,7 @@ mod tests { is_present: true, tag: "tag-1".to_string(), }); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( @@ -2168,7 +2179,7 @@ mod tests { let query = ListSplitsQuery::try_from_index_uids(vec![index_uid.clone(), index_uid_2.clone()]) .unwrap(); - append_query_filters(sql, &query); + append_query_filters_and_order_by(sql, &query); assert_eq!( sql.to_string(PostgresQueryBuilder), format!( diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 65ef9ce1df6..f1e8d4f906f 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -101,10 +101,16 @@ pub(super) fn append_range_filters( }; } -pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) { - // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. - - sql.cond_where(Expr::col(Splits::IndexUid).is_in(&query.index_uids)); +pub(super) fn append_query_filters_and_order_by( + sql: &mut SelectStatement, + query: &ListSplitsQuery, +) { + if let Some(index_uids) = &query.index_uids { + // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. + // TODO we should explore IN VALUES, = ANY and similar constructs in case they perform + // better. + sql.cond_where(Expr::col(Splits::IndexUid).is_in(index_uids)); + } if let Some(node_id) = &query.node_id { sql.cond_where(Expr::col(Splits::NodeId).eq(node_id)); diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index de9c43b7e01..659566f2e81 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -1508,3 +1508,172 @@ pub async fn test_metastore_list_after_split< cleanup_index(&mut metastore, index_uid_1.clone()).await; cleanup_index(&mut metastore, index_uid_2.clone()).await; } + +pub async fn test_metastore_list_splits_from_all_indexes< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let split_id = append_random_suffix("test-list-sorted-splits-"); + let index_id_1 = append_random_suffix("test-list-sorted-splits-1"); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-sorted-splits-2"); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let split_id_1 = format!("{split_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_2 = format!("{split_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_3 = format!("{split_id}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_4 = format!("{split_id}--split-4"); + let split_metadata_4 = SplitMetadata { + split_id: split_id_4.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_5 = format!("{split_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_6 = format!("{split_id}--split-6"); + let split_metadata_6 = SplitMetadata { + split_id: split_id_6.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + + { + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![ + split_metadata_1.clone(), + split_metadata_3.clone(), + split_metadata_5.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_1.clone(), vec![split_id_3.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![ + split_metadata_2.clone(), + split_metadata_4.clone(), + split_metadata_6.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_2.clone(), vec![split_id_4.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + } + + let expected_all = [ + &split_metadata_1, + &split_metadata_3, + &split_metadata_5, + &split_metadata_2, + &split_metadata_4, + &split_metadata_6, + ]; + + let expected_res = expected_all[1..] + .iter() + .map(|split| (&split.index_uid, &split.split_id)) + .collect::>(); + + let query = ListSplitsQuery::for_all_indexes() + .sort_by_index_uid() + .after_split(expected_all[0]); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) + }) + // when running this test against a clean database, this line isn't neeeded. In practice, + // any test that leaves any split behind breaks this tes tif we remove this filter + .filter(|(index_uid, _split_id)| { + [index_uid_1.clone(), index_uid_2.clone()].contains(index_uid) + }) + .collect::>(); + assert_eq!(split_ids, expected_res); + + cleanup_index(&mut metastore, index_uid_1.clone()).await; + cleanup_index(&mut metastore, index_uid_2.clone()).await; +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 7699c3eb11f..e22164a701b 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -445,6 +445,13 @@ macro_rules! metastore_test_suite { $crate::tests::list_splits::test_metastore_list_after_split::<$metastore_type>().await; } + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_splits_from_all_indexes() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::list_splits::test_metastore_list_splits_from_all_indexes::<$metastore_type>().await; + } + #[tokio::test] #[serial_test::file_serial] async fn test_metastore_update_splits_delete_opstamp() { diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 766029b159e..7dd8dafe366 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -4768,11 +4768,11 @@ mod tests { list_splits_request.deserialize_list_splits_query().unwrap(); assert!( list_splits_query.index_uids - == vec![ + == Some(vec![ index_uid_1.clone(), index_uid_2.clone(), index_uid_3.clone() - ] + ]) ); let splits = vec![ MockSplitBuilder::new("index-1-split-1") @@ -4888,7 +4888,7 @@ mod tests { .return_once(move |list_splits_request| { let list_splits_query = list_splits_request.deserialize_list_splits_query().unwrap(); - assert!(list_splits_query.index_uids == vec![index_uid_1.clone(),]); + assert!(list_splits_query.index_uids == Some(vec![index_uid_1.clone()])); let splits = vec![ MockSplitBuilder::new("index-1-split-1") .with_index_uid(&index_uid_1) diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index e90469de634..832df922983 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -1132,7 +1132,7 @@ mod tests { .expect_list_splits() .returning(move |list_splits_request: ListSplitsRequest| { let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap(); - if list_split_query.index_uids.contains(&index_uid) + if list_split_query.index_uids.unwrap().contains(&index_uid) && list_split_query.split_states == vec![SplitState::Published, SplitState::Staged] && list_split_query.time_range.start == Bound::Included(10) @@ -1219,7 +1219,7 @@ mod tests { .expect_list_splits() .withf(move |list_split_request| -> bool { let list_split_query = list_split_request.deserialize_list_splits_query().unwrap(); - list_split_query.index_uids.contains(&index_uid) + list_split_query.index_uids.unwrap().contains(&index_uid) }) .return_once(move |_| { let splits = vec![split_1, split_2]; @@ -1271,7 +1271,7 @@ mod tests { mock_metastore.expect_list_splits().return_once( move |list_split_request: ListSplitsRequest| { let list_split_query = list_split_request.deserialize_list_splits_query().unwrap(); - if list_split_query.index_uids.contains(&index_uid) + if list_split_query.index_uids.unwrap().contains(&index_uid) && list_split_query.split_states.is_empty() && list_split_query.time_range.is_unbounded() && list_split_query.create_timestamp.is_unbounded()