Skip to content

Commit

Permalink
Improve search job placer logging
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Oct 22, 2024
1 parent 82b0102 commit 93d3915
Showing 1 changed file with 33 additions and 22 deletions.
55 changes: 33 additions & 22 deletions quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use async_trait::async_trait;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use tracing::warn;
use tracing::{info, warn};

use crate::{SearchJob, SearchServiceClient, SearcherPool, SEARCH_METRICS};

Expand Down Expand Up @@ -80,7 +80,7 @@ impl EventSubscriber<ReportSplitsRequest> for SearchJobPlacer {
.max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id))
// This actually never happens thanks to the if-condition at the
// top of this function.
.expect("`nodes` should not be empty.");
.expect("`nodes` should not be empty");
splits_per_node
.entry(*node_addr)
.or_default()
Expand Down Expand Up @@ -150,39 +150,50 @@ impl SearchJobPlacer {
mut jobs: Vec<J>,
excluded_addrs: &HashSet<SocketAddr>,
) -> anyhow::Result<impl Iterator<Item = (SearchServiceClient, Vec<J>)>> {
let num_nodes = self.searcher_pool.len();
let mut all_nodes = self.searcher_pool.pairs();

let mut candidate_nodes: Vec<CandidateNodes> = self
.searcher_pool
.pairs()
if all_nodes.is_empty() {
bail!(
"failed to assign search jobs: there are no available searcher nodes in the \
cluster"
);
}
if !excluded_addrs.is_empty() && excluded_addrs.len() < all_nodes.len() {
all_nodes.retain(|(grpc_addr, _)| !excluded_addrs.contains(grpc_addr));

if all_nodes.len() == 0 {
bail!(
"failed to assign search jobs: there are no searcher nodes candidates for \
these jobs"
);
}
info!(
"excluded {} nodes from search job placement, {} remaining",
excluded_addrs.len(),
all_nodes.len()
);
}
let mut candidate_nodes: Vec<_> = all_nodes
.into_iter()
.filter(|(grpc_addr, _)| {
excluded_addrs.is_empty()
|| excluded_addrs.len() == num_nodes
|| !excluded_addrs.contains(grpc_addr)
})
.map(|(grpc_addr, client)| CandidateNodes {
.map(|(grpc_addr, client)| CandidateNode {
grpc_addr,
client,
load: 0,
})
.collect();

if candidate_nodes.is_empty() {
bail!(
"failed to assign search jobs. there are no available searcher nodes in the pool"
);
}
jobs.sort_unstable_by(Job::compare_cost);

let num_nodes = candidate_nodes.len();

let mut job_assignments: HashMap<SocketAddr, (SearchServiceClient, Vec<J>)> =
HashMap::with_capacity(num_nodes);

let total_load: usize = jobs.iter().map(|job| job.cost()).sum();

// allow around 5% disparity. Round up so we never end up in a case where
// target_load * num_nodes < total_load
// some of our tests needs 2 splits to be put on 2 different searchers. It makes sens for
// some of our tests needs 2 splits to be put on 2 different searchers. It makes sense for
// these tests to keep doing so (testing root merge). Either we can make the allowed
// difference stricter, find the right split names ("split6" instead of "split2" works).
// or modify mock_split_meta() so that not all splits have the same job cost
Expand Down Expand Up @@ -239,25 +250,25 @@ impl SearchJobPlacer {
}

#[derive(Debug, Clone)]
struct CandidateNodes {
struct CandidateNode {
pub grpc_addr: SocketAddr,
pub client: SearchServiceClient,
pub load: usize,
}

impl Hash for CandidateNodes {
impl Hash for CandidateNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.grpc_addr.hash(state);
}
}

impl PartialEq for CandidateNodes {
impl PartialEq for CandidateNode {
fn eq(&self, other: &Self) -> bool {
self.grpc_addr == other.grpc_addr
}
}

impl Eq for CandidateNodes {}
impl Eq for CandidateNode {}

/// Groups jobs by index id and returns a list of `SearchJob` per index
pub fn group_jobs_by_index_id(
Expand Down

0 comments on commit 93d3915

Please sign in to comment.