Skip to content

Commit

Permalink
Allowing failed splits in root search.
Browse files Browse the repository at this point in the history
This PR adds a query string parameter (`allow_partial_search_results`) in
the elasticsearch api controlling encounterring an error on a subset
of the splits, should fail a search query or if a response
(reflecting the partial set of successful splits) should be returned.

It changes the behavior of the root search gRPC call, that will not return
an error in presence of partial split errors, but instead returns the list of splits that failed.

This PR also changes the default behavior of the elasticsearch rest API: following elasticsearch behavior on failing shards, quickwit will allow split errors.

The Quickwit API default behavior, on the other hand, is unchanged.

Closes #5411
  • Loading branch information
fulmicoton committed Sep 25, 2024
1 parent 7d357fa commit 47abca3
Show file tree
Hide file tree
Showing 16 changed files with 401 additions and 72 deletions.
3 changes: 2 additions & 1 deletion docs/reference/es_compatible_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ If a parameter appears both as a query string parameter and in the JSON payload,
| `size` | `Integer` | Number of hits to return. | 10 |
| `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) |
| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) |
| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` |

#### Supported Request Body parameters

Expand Down Expand Up @@ -301,7 +302,7 @@ GET api/v1/_elastic/_cat/indices

Use the [cat indices API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html) to get the following information for each index in a cluster:
* Shard count
* Document count
* Document count
* Deleted document count
* Primary store size
* Total store size
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
format: BodyFormat::Json,
sort_by,
count_all: CountHits::CountAll,
allow_failed_splits: false,
};
let search_request =
search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
Expand Down
32 changes: 24 additions & 8 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ message ListFieldsRequest {
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 2;

// Time filter, expressed in seconds since epoch.
// That filter is to be interpreted as the semi-open interval:
// [start_timestamp, end_timestamp).
optional int64 start_timestamp = 3;
optional int64 end_timestamp = 4;

// Control if the the request will fail if split_ids contains a split that does not exist.
// Control if the the request will fail if split_ids contains a split that does not exist.
// optional bool fail_on_missing_index = 6;
}

Expand All @@ -149,7 +149,7 @@ message LeafListFieldsRequest {
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 4;

}

message ListFieldsResponse {
Expand Down Expand Up @@ -299,6 +299,17 @@ message SearchResponse {

// Scroll Id (only set if scroll_secs was set in the request)
optional string scroll_id = 6;

// Returns the list of splits for which search failed.
// For the moment, the cause is unknown.
//
// It is up to the caller to decide whether to interpret
// this as an overall failure or to present the partial results
// to the end user.
repeated SplitSearchError failed_splits = 7;

// Total number of successful splits searched.
uint64 num_successful_splits = 8;
}

message SearchPlanResponse {
Expand Down Expand Up @@ -340,7 +351,7 @@ message LeafSearchRequest {
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
uint32 doc_mapper_ord = 1;

// The ordinal of the index uri in LeafSearchRequest.index_uris
uint32 index_uri_ord = 2;

Expand Down Expand Up @@ -453,10 +464,16 @@ message LeafSearchResponse {
// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple.
repeated SplitSearchError failed_splits = 3;

// Total number of splits the leaf(s) were in charge of.
// num_attempted_splits = num_successful_splits + num_failed_splits.
// Total number of attempt to search into splits.
// We do have:
// `num_splits_requested == num_successful_splits + num_failed_splits.len()`
// But we do not necessarily have:
// `num_splits_requested = num_attempted_splits because of retries.`
uint64 num_attempted_splits = 4;

// Total number of successful splits searched.
uint64 num_successful_splits = 7;

// Deprecated json serialized intermediate aggregation_result.
reserved 5;

Expand Down Expand Up @@ -550,8 +567,7 @@ message LeafListTermsResponse {
// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple.
repeated SplitSearchError failed_splits = 3;

// Total number of splits the leaf(s) were in charge of.
// num_attempted_splits = num_successful_splits + num_failed_splits.
// Total number of single split search attempted.
uint64 num_attempted_splits = 4;
}

Expand Down
24 changes: 20 additions & 4 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 60 additions & 32 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,36 @@ impl ClusterClient {
) -> crate::Result<LeafSearchResponse> {
let mut response_res = client.leaf_search(request.clone()).await;
let retry_policy = LeafSearchRetryPolicy {};
if let Some(retry_request) = retry_policy.retry_request(request, &response_res) {
assert!(!retry_request.leaf_requests.is_empty());
client = retry_client(
&self.search_job_placer,
client.grpc_addr(),
&retry_request.leaf_requests[0].split_offsets[0].split_id,
)
.await?;
debug!(
"Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}",
response_res, retry_request, client
// We retry only once.
let Some(retry_request) = retry_policy.retry_request(request, &response_res) else {
return response_res;
};
let Some(first_split) = retry_request
.leaf_requests
.iter()
.flat_map(|leaf_req| leaf_req.split_offsets.iter())
.next()
else {
warn!(
"the retry request did not contain any split to retry. this should never happen, \
please report"
);
let retry_result = client.leaf_search(retry_request).await;
response_res = merge_leaf_search_results(response_res, retry_result);
}
return response_res;
};
// There could be more than one split in the retry request. We pick a single client
// arbitrarily only considering the affinity of the first split.
client = retry_client(
&self.search_job_placer,
client.grpc_addr(),
&first_split.split_id,
)
.await?;
debug!(
"Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}",
response_res, retry_request, client
);
let retry_result = client.leaf_search(retry_request).await;
response_res = merge_original_with_retry_leaf_search_results(response_res, retry_result);
response_res
}

Expand Down Expand Up @@ -274,16 +289,24 @@ fn merge_intermediate_aggregation(left: &[u8], right: &[u8]) -> crate::Result<Ve
Ok(serialized)
}

fn merge_leaf_search_response(
mut left_response: LeafSearchResponse,
right_response: LeafSearchResponse,
/// Merge two leaf search response.
///
/// # Quirk
///
/// This is implemented for a retries.
/// For instance, the set of attempted splits of right is supposed to be the set of failed
/// list of the left one, so that the list of the overal failed splits is the list of splits on the
/// `right_response`.
fn merge_original_with_retry_leaf_search_response(
mut original_response: LeafSearchResponse,
retry_response: LeafSearchResponse,
) -> crate::Result<LeafSearchResponse> {
left_response
original_response
.partial_hits
.extend(right_response.partial_hits);
.extend(retry_response.partial_hits);
let intermediate_aggregation_result: Option<Vec<u8>> = match (
left_response.intermediate_aggregation_result,
right_response.intermediate_aggregation_result,
original_response.intermediate_aggregation_result,
retry_response.intermediate_aggregation_result,
) {
(Some(left_agg_bytes), Some(right_agg_bytes)) => {
let intermediate_aggregation_bytes: Vec<u8> =
Expand All @@ -296,22 +319,24 @@ fn merge_leaf_search_response(
};
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: left_response.num_hits + right_response.num_hits,
num_attempted_splits: left_response.num_attempted_splits
+ right_response.num_attempted_splits,
failed_splits: right_response.failed_splits,
partial_hits: left_response.partial_hits,
num_hits: original_response.num_hits + retry_response.num_hits,
num_attempted_splits: original_response.num_attempted_splits
+ retry_response.num_attempted_splits,
failed_splits: retry_response.failed_splits,
partial_hits: original_response.partial_hits,
num_successful_splits: original_response.num_successful_splits
+ retry_response.num_successful_splits,
})
}

// Merge initial leaf search results with results obtained from a retry.
fn merge_leaf_search_results(
fn merge_original_with_retry_leaf_search_results(
left_search_response_result: crate::Result<LeafSearchResponse>,
right_search_response_result: crate::Result<LeafSearchResponse>,
) -> crate::Result<LeafSearchResponse> {
match (left_search_response_result, right_search_response_result) {
(Ok(left_response), Ok(right_response)) => {
merge_leaf_search_response(left_response, right_response)
merge_original_with_retry_leaf_search_response(left_response, right_response)
}
(Ok(single_valid_response), Err(_)) => Ok(single_valid_response),
(Err(_), Ok(single_valid_response)) => Ok(single_valid_response),
Expand Down Expand Up @@ -626,8 +651,11 @@ mod tests {
num_attempted_splits: 1,
..Default::default()
};
let merged_leaf_search_response =
merge_leaf_search_results(Ok(leaf_response), Ok(leaf_response_retry)).unwrap();
let merged_leaf_search_response = merge_original_with_retry_leaf_search_results(
Ok(leaf_response),
Ok(leaf_response_retry),
)
.unwrap();
assert_eq!(merged_leaf_search_response.num_attempted_splits, 2);
assert_eq!(merged_leaf_search_response.num_hits, 2);
assert_eq!(merged_leaf_search_response.partial_hits.len(), 2);
Expand All @@ -649,7 +677,7 @@ mod tests {
num_attempted_splits: 1,
..Default::default()
};
let merged_result = merge_leaf_search_results(
let merged_result = merge_original_with_retry_leaf_search_results(
Err(SearchError::Internal("error".to_string())),
Ok(leaf_response),
)
Expand All @@ -663,7 +691,7 @@ mod tests {

#[test]
fn test_merge_leaf_search_retry_error_on_error() -> anyhow::Result<()> {
let merge_error = merge_leaf_search_results(
let merge_error = merge_original_with_retry_leaf_search_results(
Err(SearchError::Internal("error".to_string())),
Err(SearchError::Internal("retry error".to_string())),
)
Expand Down
Loading

0 comments on commit 47abca3

Please sign in to comment.