Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing failed splits in root search. #5440

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/reference/es_compatible_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ If a parameter appears both as a query string parameter and in the JSON payload,
| `size` | `Integer` | Number of hits to return. | 10 |
| `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) |
| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) |
| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` |

#### Supported Request Body parameters

Expand Down Expand Up @@ -301,7 +302,7 @@ GET api/v1/_elastic/_cat/indices

Use the [cat indices API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html) to get the following information for each index in a cluster:
* Shard count
* Document count
* Document count
* Deleted document count
* Primary store size
* Total store size
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
format: BodyFormat::Json,
sort_by,
count_all: CountHits::CountAll,
allow_failed_splits: false,
};
let search_request =
search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
Expand Down
32 changes: 24 additions & 8 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ message ListFieldsRequest {
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 2;

// Time filter, expressed in seconds since epoch.
// That filter is to be interpreted as the semi-open interval:
// [start_timestamp, end_timestamp).
optional int64 start_timestamp = 3;
optional int64 end_timestamp = 4;

// Control if the the request will fail if split_ids contains a split that does not exist.
// Control if the the request will fail if split_ids contains a split that does not exist.
// optional bool fail_on_missing_index = 6;
}

Expand All @@ -149,7 +149,7 @@ message LeafListFieldsRequest {
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 4;

}

message ListFieldsResponse {
Expand Down Expand Up @@ -299,6 +299,17 @@ message SearchResponse {

// Scroll Id (only set if scroll_secs was set in the request)
optional string scroll_id = 6;

// Returns the list of splits for which search failed.
// For the moment, the cause is unknown.
//
// It is up to the caller to decide whether to interpret
// this as an overall failure or to present the partial results
// to the end user.
repeated SplitSearchError failed_splits = 7;

// Total number of successful splits searched.
uint64 num_successful_splits = 8;
}

message SearchPlanResponse {
Expand Down Expand Up @@ -340,7 +351,7 @@ message LeafSearchRequest {
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
uint32 doc_mapper_ord = 1;

// The ordinal of the index uri in LeafSearchRequest.index_uris
uint32 index_uri_ord = 2;

Expand Down Expand Up @@ -453,10 +464,16 @@ message LeafSearchResponse {
// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple.
repeated SplitSearchError failed_splits = 3;

// Total number of splits the leaf(s) were in charge of.
// num_attempted_splits = num_successful_splits + num_failed_splits.
// Total number of attempt to search into splits.
// We do have:
// `num_splits_requested == num_successful_splits + num_failed_splits.len()`
// But we do not necessarily have:
// `num_splits_requested = num_attempted_splits because of retries.`
uint64 num_attempted_splits = 4;

// Total number of successful splits searched.
uint64 num_successful_splits = 7;

// Deprecated json serialized intermediate aggregation_result.
reserved 5;

Expand Down Expand Up @@ -550,8 +567,7 @@ message LeafListTermsResponse {
// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple.
repeated SplitSearchError failed_splits = 3;

// Total number of splits the leaf(s) were in charge of.
// num_attempted_splits = num_successful_splits + num_failed_splits.
// Total number of single split search attempted.
uint64 num_attempted_splits = 4;
}

Expand Down
24 changes: 20 additions & 4 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 60 additions & 32 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,36 @@ impl ClusterClient {
) -> crate::Result<LeafSearchResponse> {
let mut response_res = client.leaf_search(request.clone()).await;
let retry_policy = LeafSearchRetryPolicy {};
if let Some(retry_request) = retry_policy.retry_request(request, &response_res) {
assert!(!retry_request.leaf_requests.is_empty());
client = retry_client(
&self.search_job_placer,
client.grpc_addr(),
&retry_request.leaf_requests[0].split_offsets[0].split_id,
)
.await?;
debug!(
"Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}",
response_res, retry_request, client
// We retry only once.
let Some(retry_request) = retry_policy.retry_request(request, &response_res) else {
return response_res;
};
let Some(first_split) = retry_request
.leaf_requests
.iter()
.flat_map(|leaf_req| leaf_req.split_offsets.iter())
.next()
else {
warn!(
"the retry request did not contain any split to retry. this should never happen, \
please report"
);
let retry_result = client.leaf_search(retry_request).await;
response_res = merge_leaf_search_results(response_res, retry_result);
}
return response_res;
};
// There could be more than one split in the retry request. We pick a single client
// arbitrarily only considering the affinity of the first split.
client = retry_client(
&self.search_job_placer,
client.grpc_addr(),
&first_split.split_id,
)
.await?;
debug!(
"Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}",
response_res, retry_request, client
);
let retry_result = client.leaf_search(retry_request).await;
response_res = merge_original_with_retry_leaf_search_results(response_res, retry_result);
response_res
}

Expand Down Expand Up @@ -274,16 +289,24 @@ fn merge_intermediate_aggregation(left: &[u8], right: &[u8]) -> crate::Result<Ve
Ok(serialized)
}

fn merge_leaf_search_response(
mut left_response: LeafSearchResponse,
right_response: LeafSearchResponse,
/// Merge two leaf search response.
///
/// # Quirk
///
/// This is implemented for a retries.
/// For instance, the set of attempted splits of right is supposed to be the set of failed
/// list of the left one, so that the list of the overal failed splits is the list of splits on the
/// `right_response`.
fn merge_original_with_retry_leaf_search_response(
mut original_response: LeafSearchResponse,
retry_response: LeafSearchResponse,
) -> crate::Result<LeafSearchResponse> {
left_response
original_response
.partial_hits
.extend(right_response.partial_hits);
.extend(retry_response.partial_hits);
let intermediate_aggregation_result: Option<Vec<u8>> = match (
left_response.intermediate_aggregation_result,
right_response.intermediate_aggregation_result,
original_response.intermediate_aggregation_result,
retry_response.intermediate_aggregation_result,
) {
(Some(left_agg_bytes), Some(right_agg_bytes)) => {
let intermediate_aggregation_bytes: Vec<u8> =
Expand All @@ -296,22 +319,24 @@ fn merge_leaf_search_response(
};
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: left_response.num_hits + right_response.num_hits,
num_attempted_splits: left_response.num_attempted_splits
+ right_response.num_attempted_splits,
failed_splits: right_response.failed_splits,
partial_hits: left_response.partial_hits,
num_hits: original_response.num_hits + retry_response.num_hits,
num_attempted_splits: original_response.num_attempted_splits
+ retry_response.num_attempted_splits,
failed_splits: retry_response.failed_splits,
partial_hits: original_response.partial_hits,
num_successful_splits: original_response.num_successful_splits
+ retry_response.num_successful_splits,
})
}

// Merge initial leaf search results with results obtained from a retry.
fn merge_leaf_search_results(
fn merge_original_with_retry_leaf_search_results(
left_search_response_result: crate::Result<LeafSearchResponse>,
right_search_response_result: crate::Result<LeafSearchResponse>,
) -> crate::Result<LeafSearchResponse> {
match (left_search_response_result, right_search_response_result) {
(Ok(left_response), Ok(right_response)) => {
merge_leaf_search_response(left_response, right_response)
merge_original_with_retry_leaf_search_response(left_response, right_response)
}
(Ok(single_valid_response), Err(_)) => Ok(single_valid_response),
(Err(_), Ok(single_valid_response)) => Ok(single_valid_response),
Expand Down Expand Up @@ -626,8 +651,11 @@ mod tests {
num_attempted_splits: 1,
..Default::default()
};
let merged_leaf_search_response =
merge_leaf_search_results(Ok(leaf_response), Ok(leaf_response_retry)).unwrap();
let merged_leaf_search_response = merge_original_with_retry_leaf_search_results(
Ok(leaf_response),
Ok(leaf_response_retry),
)
.unwrap();
assert_eq!(merged_leaf_search_response.num_attempted_splits, 2);
assert_eq!(merged_leaf_search_response.num_hits, 2);
assert_eq!(merged_leaf_search_response.partial_hits.len(), 2);
Expand All @@ -649,7 +677,7 @@ mod tests {
num_attempted_splits: 1,
..Default::default()
};
let merged_result = merge_leaf_search_results(
let merged_result = merge_original_with_retry_leaf_search_results(
Err(SearchError::Internal("error".to_string())),
Ok(leaf_response),
)
Expand All @@ -663,7 +691,7 @@ mod tests {

#[test]
fn test_merge_leaf_search_retry_error_on_error() -> anyhow::Result<()> {
let merge_error = merge_leaf_search_results(
let merge_error = merge_original_with_retry_leaf_search_results(
Err(SearchError::Internal("error".to_string())),
Err(SearchError::Internal("retry error".to_string())),
)
Expand Down
Loading
Loading