diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index 9b4965eaa89..9914ffdd177 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -28,7 +28,7 @@ use async_trait::async_trait; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; -use tracing::warn; +use tracing::{info, warn}; use crate::{SearchJob, SearchServiceClient, SearcherPool, SEARCH_METRICS}; @@ -80,7 +80,7 @@ impl EventSubscriber for SearchJobPlacer { .max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id)) // This actually never happens thanks to the if-condition at the // top of this function. - .expect("`nodes` should not be empty."); + .expect("`nodes` should not be empty"); splits_per_node .entry(*node_addr) .or_default() @@ -150,31 +150,43 @@ impl SearchJobPlacer { mut jobs: Vec, excluded_addrs: &HashSet, ) -> anyhow::Result)>> { - let num_nodes = self.searcher_pool.len(); + let mut all_nodes = self.searcher_pool.pairs(); - let mut candidate_nodes: Vec = self - .searcher_pool - .pairs() + if all_nodes.is_empty() { + bail!( + "failed to assign search jobs: there are no available searcher nodes in the \ + cluster" + ); + } + if !excluded_addrs.is_empty() && excluded_addrs.len() < all_nodes.len() { + all_nodes.retain(|(grpc_addr, _)| !excluded_addrs.contains(grpc_addr)); + + // This should never happen, but... belt and suspenders policy. + if all_nodes.is_empty() { + bail!( + "failed to assign search jobs: there are no searcher nodes candidates for \ + these jobs" + ); + } + info!( + "excluded {} nodes from search job placement, {} remaining", + excluded_addrs.len(), + all_nodes.len() + ); + } + let mut candidate_nodes: Vec<_> = all_nodes .into_iter() - .filter(|(grpc_addr, _)| { - excluded_addrs.is_empty() - || excluded_addrs.len() == num_nodes - || !excluded_addrs.contains(grpc_addr) - }) - .map(|(grpc_addr, client)| CandidateNodes { + .map(|(grpc_addr, client)| CandidateNode { grpc_addr, client, load: 0, }) .collect(); - if candidate_nodes.is_empty() { - bail!( - "failed to assign search jobs. there are no available searcher nodes in the pool" - ); - } jobs.sort_unstable_by(Job::compare_cost); + let num_nodes = candidate_nodes.len(); + let mut job_assignments: HashMap)> = HashMap::with_capacity(num_nodes); @@ -182,7 +194,7 @@ impl SearchJobPlacer { // allow around 5% disparity. Round up so we never end up in a case where // target_load * num_nodes < total_load - // some of our tests needs 2 splits to be put on 2 different searchers. It makes sens for + // some of our tests needs 2 splits to be put on 2 different searchers. It makes sense for // these tests to keep doing so (testing root merge). Either we can make the allowed // difference stricter, find the right split names ("split6" instead of "split2" works). // or modify mock_split_meta() so that not all splits have the same job cost @@ -239,25 +251,25 @@ impl SearchJobPlacer { } #[derive(Debug, Clone)] -struct CandidateNodes { +struct CandidateNode { pub grpc_addr: SocketAddr, pub client: SearchServiceClient, pub load: usize, } -impl Hash for CandidateNodes { +impl Hash for CandidateNode { fn hash(&self, state: &mut H) { self.grpc_addr.hash(state); } } -impl PartialEq for CandidateNodes { +impl PartialEq for CandidateNode { fn eq(&self, other: &Self) -> bool { self.grpc_addr == other.grpc_addr } } -impl Eq for CandidateNodes {} +impl Eq for CandidateNode {} /// Groups jobs by index id and returns a list of `SearchJob` per index pub fn group_jobs_by_index_id(