Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix jobs group_by #4922

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions quickwit/quickwit-search/src/list_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;

use crate::leaf::open_split_bundle;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::service::SearcherContext;
use crate::{list_relevant_splits, resolve_index_patterns, ClusterClient, SearchError, SearchJob};

Expand Down Expand Up @@ -356,20 +357,24 @@ pub fn jobs_to_leaf_requests(
let search_request_for_leaf = request.clone();
let mut leaf_search_requests = Vec::new();
// Group jobs by index uid.
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let index_meta = index_uid_to_id.get(&index_uid).ok_or_else(|| {
group_jobs_by_index_id(jobs, |job_group| {
let index_uid = &job_group[0].index_uid;
let index_meta = index_uid_to_id.get(index_uid).ok_or_else(|| {
SearchError::Internal(format!(
"received list fields job for an unknown index {index_uid}. it should never happen"
))
})?;

let leaf_search_request = LeafListFieldsRequest {
index_id: index_meta.index_id.to_string(),
index_uri: index_meta.index_uri.to_string(),
fields: search_request_for_leaf.fields.clone(),
split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
};
leaf_search_requests.push(leaf_search_request);
}
Ok(())
})?;

Ok(leaf_search_requests)
}

Expand Down
11 changes: 7 additions & 4 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};

use crate::leaf::open_index_with_caches;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext};

/// Performs a distributed list terms.
Expand Down Expand Up @@ -184,20 +185,22 @@ pub fn jobs_to_leaf_requests(
) -> crate::Result<Vec<LeafListTermsRequest>> {
let search_request_for_leaf = request.clone();
let mut leaf_search_requests = Vec::new();
// Group jobs by index uid.
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let index_uri = index_uid_to_uri.get(&index_uid).ok_or_else(|| {
group_jobs_by_index_id(jobs, |job_group| {
let index_uid = &job_group[0].index_uid;
let index_uri = index_uid_to_uri.get(index_uid).ok_or_else(|| {
SearchError::Internal(format!(
"received list fields job for an unknown index {index_uid}. it should never happen"
))
})?;

let leaf_search_request = LeafListTermsRequest {
list_terms_request: Some(search_request_for_leaf.clone()),
index_uri: index_uri.to_string(),
split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
};
leaf_search_requests.push(leaf_search_request);
}
Ok(())
})?;
Ok(leaf_search_requests)
}

Expand Down
75 changes: 42 additions & 33 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::cluster_client::ClusterClient;
use crate::collector::{make_merge_collector, QuickwitAggregations};
use crate::find_trace_ids_collector::Span;
use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset};
use crate::search_job_placer::Job;
use crate::search_job_placer::{group_by, group_jobs_by_index_id, Job};
use crate::service::SearcherContext;
use crate::{
extract_split_and_footer_offsets, list_relevant_splits, SearchError, SearchJobPlacer,
Expand Down Expand Up @@ -1399,28 +1399,30 @@ fn compute_split_cost(_split_metadata: &SplitMetadata) -> usize {
pub fn jobs_to_leaf_requests(
request: &SearchRequest,
search_indexes_metadatas: &IndexesMetasForLeafSearch,
mut jobs: Vec<SearchJob>,
jobs: Vec<SearchJob>,
) -> crate::Result<Vec<LeafSearchRequest>> {
let mut search_request_for_leaf = request.clone();
search_request_for_leaf.start_offset = 0;
search_request_for_leaf.max_hits += request.start_offset;
let mut leaf_search_requests = Vec::new();
// Group jobs by index uid.
jobs.sort_by(|job1, job2| job1.index_uid.cmp(&job2.index_uid));
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let search_index_meta = search_indexes_metadatas.get(&index_uid).ok_or_else(|| {
group_jobs_by_index_id(jobs, |job_group| {
let index_uid = &job_group[0].index_uid;
let search_index_meta = search_indexes_metadatas.get(index_uid).ok_or_else(|| {
SearchError::Internal(format!(
"received search job for an unknown index {index_uid}. it should never happen"
"received job for an unknown index {index_uid}. it should never happen"
))
})?;

let leaf_search_request = LeafSearchRequest {
search_request: Some(search_request_for_leaf.clone()),
split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
doc_mapper: search_index_meta.doc_mapper_str.clone(),
index_uri: search_index_meta.index_uri.to_string(),
};
leaf_search_requests.push(leaf_search_request);
}
Ok(())
})?;
Ok(leaf_search_requests)
}

Expand All @@ -1432,32 +1434,39 @@ pub fn jobs_to_fetch_docs_requests(
) -> crate::Result<Vec<FetchDocsRequest>> {
let mut fetch_docs_requests = Vec::new();
// Group jobs by index uid.
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let index_meta = indexes_metas_for_leaf_search
.get(&index_uid)
.ok_or_else(|| {
SearchError::Internal(format!(
"received search job for an unknown index {index_uid}"
))
})?;
let fetch_docs_jobs: Vec<FetchDocsJob> = job_group.collect();
let partial_hits: Vec<PartialHit> = fetch_docs_jobs
.iter()
.flat_map(|fetch_doc_job| fetch_doc_job.partial_hits.iter().cloned())
.collect();
let split_offsets: Vec<SplitIdAndFooterOffsets> = fetch_docs_jobs
.into_iter()
.map(|fetch_doc_job| fetch_doc_job.into())
.collect();
let fetch_docs_req = FetchDocsRequest {
partial_hits,
split_offsets,
index_uri: index_meta.index_uri.to_string(),
snippet_request: snippet_request_opt.clone(),
doc_mapper: index_meta.doc_mapper_str.clone(),
};
fetch_docs_requests.push(fetch_docs_req);
}
group_by(
jobs,
|job| &job.index_uid,
|fetch_docs_jobs| {
let index_uid = &fetch_docs_jobs[0].index_uid;

let index_meta = indexes_metas_for_leaf_search
.get(index_uid)
.ok_or_else(|| {
SearchError::Internal(format!(
"received search job for an unknown index {index_uid}"
))
})?;
let partial_hits: Vec<PartialHit> = fetch_docs_jobs
.iter()
.flat_map(|fetch_doc_job| fetch_doc_job.partial_hits.iter().cloned())
.collect();
let split_offsets: Vec<SplitIdAndFooterOffsets> = fetch_docs_jobs
.into_iter()
.map(|fetch_doc_job| fetch_doc_job.into())
.collect();
let fetch_docs_req = FetchDocsRequest {
partial_hits,
split_offsets,
index_uri: index_meta.index_uri.to_string(),
snippet_request: snippet_request_opt.clone(),
doc_mapper: index_meta.doc_mapper_str.clone(),
};
fetch_docs_requests.push(fetch_docs_req);

Ok(())
},
)?;
Ok(fetch_docs_requests)
}

Expand Down
91 changes: 90 additions & 1 deletion quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};

use crate::{SearchServiceClient, SearcherPool};
use crate::{SearchJob, SearchServiceClient, SearcherPool};

/// Job.
/// The unit in which distributed search is performed.
Expand Down Expand Up @@ -234,11 +234,100 @@ impl PartialEq for CandidateNodes {

impl Eq for CandidateNodes {}

/// Groups jobs by index id and returns a list of `SearchJob` per index
pub fn group_jobs_by_index_id(
jobs: Vec<SearchJob>,
cb: impl FnMut(Vec<SearchJob>) -> crate::Result<()>,
) -> crate::Result<()> {
// Group jobs by index uid.
group_by(jobs, |job| &job.index_uid, cb)?;
Ok(())
}

/// Note: The data will be sorted.
///
/// Returns slices of the input data grouped by passed closure.
pub fn group_by<T, K: Ord, F>(
mut data: Vec<T>,
compare_by: impl Fn(&T) -> &K,
mut callback: F,
) -> crate::Result<()>
where
F: FnMut(Vec<T>) -> crate::Result<()>,
{
data.sort_by(|job1, job2| compare_by(job2).cmp(compare_by(job1)));
while !data.is_empty() {
let last_element = data.last().unwrap();
let count = data
.iter()
.rev()
.take_while(|&x| compare_by(x) == compare_by(last_element))
.count();

let group = data.split_off(data.len() - count);
callback(group)?;
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{searcher_pool_for_test, MockSearchService, SearchJob};

#[test]
fn test_group_by_1() {
let data = vec![1, 1, 2, 2, 2, 3, 4, 4, 5, 5, 5];
let mut outputs: Vec<Vec<i32>> = Vec::new();
group_by(
data,
|el| el,
|group| {
outputs.push(group);
Ok(())
},
)
.unwrap();
assert_eq!(outputs.len(), 5);
assert_eq!(outputs[0], vec![1, 1]);
assert_eq!(outputs[1], vec![2, 2, 2]);
assert_eq!(outputs[2], vec![3]);
assert_eq!(outputs[3], vec![4, 4]);
assert_eq!(outputs[4], vec![5, 5, 5]);
}
#[test]
fn test_group_by_all_same() {
let data = vec![1, 1];
let mut outputs: Vec<Vec<i32>> = Vec::new();
group_by(
data,
|el| el,
|group| {
outputs.push(group);
Ok(())
},
)
.unwrap();
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0], vec![1, 1]);
}
#[test]
fn test_group_by_empty() {
let data = vec![];
let mut outputs: Vec<Vec<i32>> = Vec::new();
group_by(
data,
|el| el,
|group| {
outputs.push(group);
Ok(())
},
)
.unwrap();
assert_eq!(outputs.len(), 0);
}

#[tokio::test]
async fn test_search_job_placer() {
{
Expand Down