From 59a7464d4dbb7f0eab40f983440e81ca4cb506c6 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 29 Apr 2024 13:20:08 +0800 Subject: [PATCH] fix jobs group_by Same bug as https://github.com/quickwit-oss/quickwit/pull/4880, but on other code paths. unify code paths. --- quickwit/quickwit-search/src/list_fields.rs | 11 ++- quickwit/quickwit-search/src/list_terms.rs | 11 ++- quickwit/quickwit-search/src/root.rs | 75 ++++++++------- .../quickwit-search/src/search_job_placer.rs | 91 ++++++++++++++++++- 4 files changed, 147 insertions(+), 41 deletions(-) diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs index 4e54c8adc48..d6030dbe8f6 100644 --- a/quickwit/quickwit-search/src/list_fields.rs +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -38,6 +38,7 @@ use quickwit_proto::types::IndexUid; use quickwit_storage::Storage; use crate::leaf::open_split_bundle; +use crate::search_job_placer::group_jobs_by_index_id; use crate::service::SearcherContext; use crate::{list_relevant_splits, resolve_index_patterns, ClusterClient, SearchError, SearchJob}; @@ -356,12 +357,14 @@ pub fn jobs_to_leaf_requests( let search_request_for_leaf = request.clone(); let mut leaf_search_requests = Vec::new(); // Group jobs by index uid. - for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) { - let index_meta = index_uid_to_id.get(&index_uid).ok_or_else(|| { + group_jobs_by_index_id(jobs, |job_group| { + let index_uid = &job_group[0].index_uid; + let index_meta = index_uid_to_id.get(index_uid).ok_or_else(|| { SearchError::Internal(format!( "received list fields job for an unknown index {index_uid}. it should never happen" )) })?; + let leaf_search_request = LeafListFieldsRequest { index_id: index_meta.index_id.to_string(), index_uri: index_meta.index_uri.to_string(), @@ -369,7 +372,9 @@ pub fn jobs_to_leaf_requests( split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), }; leaf_search_requests.push(leaf_search_request); - } + Ok(()) + })?; + Ok(leaf_search_requests) } diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index fc99bdd8fc8..5bd6d5f32b4 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -39,6 +39,7 @@ use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; +use crate::search_job_placer::group_jobs_by_index_id; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -184,20 +185,22 @@ pub fn jobs_to_leaf_requests( ) -> crate::Result> { let search_request_for_leaf = request.clone(); let mut leaf_search_requests = Vec::new(); - // Group jobs by index uid. - for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) { - let index_uri = index_uid_to_uri.get(&index_uid).ok_or_else(|| { + group_jobs_by_index_id(jobs, |job_group| { + let index_uid = &job_group[0].index_uid; + let index_uri = index_uid_to_uri.get(index_uid).ok_or_else(|| { SearchError::Internal(format!( "received list fields job for an unknown index {index_uid}. it should never happen" )) })?; + let leaf_search_request = LeafListTermsRequest { list_terms_request: Some(search_request_for_leaf.clone()), index_uri: index_uri.to_string(), split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), }; leaf_search_requests.push(leaf_search_request); - } + Ok(()) + })?; Ok(leaf_search_requests) } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 0e1630271fb..9ae0e02aab3 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -54,7 +54,7 @@ use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; use crate::find_trace_ids_collector::Span; use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset}; -use crate::search_job_placer::Job; +use crate::search_job_placer::{group_by, group_jobs_by_index_id, Job}; use crate::service::SearcherContext; use crate::{ extract_split_and_footer_offsets, list_relevant_splits, SearchError, SearchJobPlacer, @@ -1399,20 +1399,21 @@ fn compute_split_cost(_split_metadata: &SplitMetadata) -> usize { pub fn jobs_to_leaf_requests( request: &SearchRequest, search_indexes_metadatas: &IndexesMetasForLeafSearch, - mut jobs: Vec, + jobs: Vec, ) -> crate::Result> { let mut search_request_for_leaf = request.clone(); search_request_for_leaf.start_offset = 0; search_request_for_leaf.max_hits += request.start_offset; let mut leaf_search_requests = Vec::new(); // Group jobs by index uid. - jobs.sort_by(|job1, job2| job1.index_uid.cmp(&job2.index_uid)); - for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) { - let search_index_meta = search_indexes_metadatas.get(&index_uid).ok_or_else(|| { + group_jobs_by_index_id(jobs, |job_group| { + let index_uid = &job_group[0].index_uid; + let search_index_meta = search_indexes_metadatas.get(index_uid).ok_or_else(|| { SearchError::Internal(format!( - "received search job for an unknown index {index_uid}. it should never happen" + "received job for an unknown index {index_uid}. it should never happen" )) })?; + let leaf_search_request = LeafSearchRequest { search_request: Some(search_request_for_leaf.clone()), split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), @@ -1420,7 +1421,8 @@ pub fn jobs_to_leaf_requests( index_uri: search_index_meta.index_uri.to_string(), }; leaf_search_requests.push(leaf_search_request); - } + Ok(()) + })?; Ok(leaf_search_requests) } @@ -1432,32 +1434,39 @@ pub fn jobs_to_fetch_docs_requests( ) -> crate::Result> { let mut fetch_docs_requests = Vec::new(); // Group jobs by index uid. - for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) { - let index_meta = indexes_metas_for_leaf_search - .get(&index_uid) - .ok_or_else(|| { - SearchError::Internal(format!( - "received search job for an unknown index {index_uid}" - )) - })?; - let fetch_docs_jobs: Vec = job_group.collect(); - let partial_hits: Vec = fetch_docs_jobs - .iter() - .flat_map(|fetch_doc_job| fetch_doc_job.partial_hits.iter().cloned()) - .collect(); - let split_offsets: Vec = fetch_docs_jobs - .into_iter() - .map(|fetch_doc_job| fetch_doc_job.into()) - .collect(); - let fetch_docs_req = FetchDocsRequest { - partial_hits, - split_offsets, - index_uri: index_meta.index_uri.to_string(), - snippet_request: snippet_request_opt.clone(), - doc_mapper: index_meta.doc_mapper_str.clone(), - }; - fetch_docs_requests.push(fetch_docs_req); - } + group_by( + jobs, + |job| &job.index_uid, + |fetch_docs_jobs| { + let index_uid = &fetch_docs_jobs[0].index_uid; + + let index_meta = indexes_metas_for_leaf_search + .get(index_uid) + .ok_or_else(|| { + SearchError::Internal(format!( + "received search job for an unknown index {index_uid}" + )) + })?; + let partial_hits: Vec = fetch_docs_jobs + .iter() + .flat_map(|fetch_doc_job| fetch_doc_job.partial_hits.iter().cloned()) + .collect(); + let split_offsets: Vec = fetch_docs_jobs + .into_iter() + .map(|fetch_doc_job| fetch_doc_job.into()) + .collect(); + let fetch_docs_req = FetchDocsRequest { + partial_hits, + split_offsets, + index_uri: index_meta.index_uri.to_string(), + snippet_request: snippet_request_opt.clone(), + doc_mapper: index_meta.doc_mapper_str.clone(), + }; + fetch_docs_requests.push(fetch_docs_req); + + Ok(()) + }, + )?; Ok(fetch_docs_requests) } diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index 3d76ce98a6f..24c6677d0ee 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -29,7 +29,7 @@ use quickwit_common::pubsub::EventSubscriber; use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; -use crate::{SearchServiceClient, SearcherPool}; +use crate::{SearchJob, SearchServiceClient, SearcherPool}; /// Job. /// The unit in which distributed search is performed. @@ -234,11 +234,100 @@ impl PartialEq for CandidateNodes { impl Eq for CandidateNodes {} +/// Groups jobs by index id and returns a list of `SearchJob` per index +pub fn group_jobs_by_index_id( + jobs: Vec, + cb: impl FnMut(Vec) -> crate::Result<()>, +) -> crate::Result<()> { + // Group jobs by index uid. + group_by(jobs, |job| &job.index_uid, cb)?; + Ok(()) +} + +/// Note: The data will be sorted. +/// +/// Returns slices of the input data grouped by passed closure. +pub fn group_by( + mut data: Vec, + compare_by: impl Fn(&T) -> &K, + mut callback: F, +) -> crate::Result<()> +where + F: FnMut(Vec) -> crate::Result<()>, +{ + data.sort_by(|job1, job2| compare_by(job2).cmp(compare_by(job1))); + while !data.is_empty() { + let last_element = data.last().unwrap(); + let count = data + .iter() + .rev() + .take_while(|&x| compare_by(x) == compare_by(last_element)) + .count(); + + let group = data.split_off(data.len() - count); + callback(group)?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; use crate::{searcher_pool_for_test, MockSearchService, SearchJob}; + #[test] + fn test_group_by_1() { + let data = vec![1, 1, 2, 2, 2, 3, 4, 4, 5, 5, 5]; + let mut outputs: Vec> = Vec::new(); + group_by( + data, + |el| el, + |group| { + outputs.push(group); + Ok(()) + }, + ) + .unwrap(); + assert_eq!(outputs.len(), 5); + assert_eq!(outputs[0], vec![1, 1]); + assert_eq!(outputs[1], vec![2, 2, 2]); + assert_eq!(outputs[2], vec![3]); + assert_eq!(outputs[3], vec![4, 4]); + assert_eq!(outputs[4], vec![5, 5, 5]); + } + #[test] + fn test_group_by_all_same() { + let data = vec![1, 1]; + let mut outputs: Vec> = Vec::new(); + group_by( + data, + |el| el, + |group| { + outputs.push(group); + Ok(()) + }, + ) + .unwrap(); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0], vec![1, 1]); + } + #[test] + fn test_group_by_empty() { + let data = vec![]; + let mut outputs: Vec> = Vec::new(); + group_by( + data, + |el| el, + |group| { + outputs.push(group); + Ok(()) + }, + ) + .unwrap(); + assert_eq!(outputs.len(), 0); + } + #[tokio::test] async fn test_search_job_placer() { {