From 26d983dd1b77669b45f1c5459768e031c757dcd7 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 27 Sep 2024 22:14:19 +0900 Subject: [PATCH] Allowing failed splits in root search. (#5440) * Allowing failed splits in root search. This PR adds a query string parameter (`allow_partial_search_results`) in the elasticsearch api controlling encounterring an error on a subset of the splits, should fail a search query or if a response (reflecting the partial set of successful splits) should be returned. It changes the behavior of the root search gRPC call, that will not return an error in presence of partial split errors, but instead returns the list of splits that failed. This PR also changes the default behavior of the elasticsearch rest API: following elasticsearch behavior on failing shards, quickwit will allow split errors. The Quickwit API default behavior, on the other hand, is unchanged. Closes #5411 * Apply suggestions from code review Co-authored-by: trinity-1686a * Changing the moment when aggregation fast fields are checked. The logic is now aligned with what is done for queries: we check the schema at the root level. If the aggregation does not make sense for the current index docmapping, the query is rejected. --------- Co-authored-by: trinity-1686a --- docs/reference/es_compatible_api.md | 3 +- quickwit/quickwit-cli/src/tool.rs | 1 + .../protos/quickwit/search.proto | 32 +++- .../src/codegen/quickwit/quickwit.search.rs | 24 ++- .../quickwit-search/src/cluster_client.rs | 92 +++++---- quickwit/quickwit-search/src/collector.rs | 22 ++- quickwit/quickwit-search/src/error.rs | 19 ++ quickwit/quickwit-search/src/leaf.rs | 1 + quickwit/quickwit-search/src/leaf_cache.rs | 6 +- quickwit/quickwit-search/src/root.rs | 178 ++++++++++++++++-- .../quickwit-search/src/scroll_context.rs | 4 +- quickwit/quickwit-search/src/service.rs | 2 + quickwit/quickwit-search/src/tests.rs | 4 +- .../model/search_query_params.rs | 5 + .../src/elasticsearch_api/rest_handler.rs | 112 +++++++++-- .../src/jaeger_api/rest_handler.rs | 4 + .../src/search_api/rest_handler.rs | 20 +- .../es_compatibility/0012-scroll-api.yaml | 14 ++ 18 files changed, 466 insertions(+), 77 deletions(-) diff --git a/docs/reference/es_compatible_api.md b/docs/reference/es_compatible_api.md index 99b4a08c2aa..f1fffa587a1 100644 --- a/docs/reference/es_compatible_api.md +++ b/docs/reference/es_compatible_api.md @@ -135,6 +135,7 @@ If a parameter appears both as a query string parameter and in the JSON payload, | `size` | `Integer` | Number of hits to return. | 10 | | `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) | | `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) | +| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` | #### Supported Request Body parameters @@ -301,7 +302,7 @@ GET api/v1/_elastic/_cat/indices Use the [cat indices API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html) to get the following information for each index in a cluster: * Shard count -* Document count +* Document count * Deleted document count * Primary store size * Total store size diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c7ab1911205..db22ff1743e 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { format: BodyFormat::Json, sort_by, count_all: CountHits::CountAll, + allow_failed_splits: false, }; let search_request = search_request_from_api_request(vec![args.index_id], search_request_query_string)?; diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 50ad8aec3c1..60671239ecc 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -126,14 +126,14 @@ message ListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 2; - + // Time filter, expressed in seconds since epoch. // That filter is to be interpreted as the semi-open interval: // [start_timestamp, end_timestamp). optional int64 start_timestamp = 3; optional int64 end_timestamp = 4; - // Control if the the request will fail if split_ids contains a split that does not exist. + // Control if the the request will fail if split_ids contains a split that does not exist. // optional bool fail_on_missing_index = 6; } @@ -149,7 +149,7 @@ message LeafListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 4; - + } message ListFieldsResponse { @@ -299,6 +299,17 @@ message SearchResponse { // Scroll Id (only set if scroll_secs was set in the request) optional string scroll_id = 6; + + // Returns the list of splits for which search failed. + // For the moment, the cause is unknown. + // + // It is up to the caller to decide whether to interpret + // this as an overall failure or to present the partial results + // to the end user. + repeated SplitSearchError failed_splits = 7; + + // Total number of successful splits searched. + uint64 num_successful_splits = 8; } message SearchPlanResponse { @@ -340,7 +351,7 @@ message LeafSearchRequest { message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` uint32 doc_mapper_ord = 1; - + // The ordinal of the index uri in LeafSearchRequest.index_uris uint32 index_uri_ord = 2; @@ -453,10 +464,16 @@ message LeafSearchResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of attempt to search into splits. + // We do have: + // `num_splits_requested == num_successful_splits + num_failed_splits.len()` + // But we do not necessarily have: + // `num_splits_requested = num_attempted_splits because of retries.` uint64 num_attempted_splits = 4; + // Total number of successful splits searched. + uint64 num_successful_splits = 7; + // Deprecated json serialized intermediate aggregation_result. reserved 5; @@ -550,8 +567,7 @@ message LeafListTermsResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of single split search attempted. uint64 num_attempted_splits = 4; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index ed0219d0a7f..189019162f8 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -230,6 +230,17 @@ pub struct SearchResponse { /// Scroll Id (only set if scroll_secs was set in the request) #[prost(string, optional, tag = "6")] pub scroll_id: ::core::option::Option<::prost::alloc::string::String>, + /// Returns the list of splits for which search failed. + /// For the moment, the cause is unknown. + /// + /// It is up to the caller to decide whether to interpret + /// this as an overall failure or to present the partial results + /// to the end user. + #[prost(message, repeated, tag = "7")] + pub failed_splits: ::prost::alloc::vec::Vec, + /// Total number of successful splits searched. + #[prost(uint64, tag = "8")] + pub num_successful_splits: u64, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -431,10 +442,16 @@ pub struct LeafSearchResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of attempt to search into splits. + /// We do have: + /// `num_splits_requested == num_successful_splits + num_failed_splits.len()` + /// But we do not necessarily have: + /// `num_splits_requested = num_attempted_splits because of retries.` #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, + /// Total number of successful splits searched. + #[prost(uint64, tag = "7")] + pub num_successful_splits: u64, /// postcard serialized intermediate aggregation_result. #[prost(bytes = "vec", optional, tag = "6")] pub intermediate_aggregation_result: ::core::option::Option< @@ -551,8 +568,7 @@ pub struct LeafListTermsResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of single split search attempted. #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, } diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 25d53ca7554..d32ad92327c 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -94,21 +94,36 @@ impl ClusterClient { ) -> crate::Result { let mut response_res = client.leaf_search(request.clone()).await; let retry_policy = LeafSearchRetryPolicy {}; - if let Some(retry_request) = retry_policy.retry_request(request, &response_res) { - assert!(!retry_request.leaf_requests.is_empty()); - client = retry_client( - &self.search_job_placer, - client.grpc_addr(), - &retry_request.leaf_requests[0].split_offsets[0].split_id, - ) - .await?; - debug!( - "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", - response_res, retry_request, client + // We retry only once. + let Some(retry_request) = retry_policy.retry_request(request, &response_res) else { + return response_res; + }; + let Some(first_split) = retry_request + .leaf_requests + .iter() + .flat_map(|leaf_req| leaf_req.split_offsets.iter()) + .next() + else { + warn!( + "the retry request did not contain any split to retry. this should never happen, \ + please report" ); - let retry_result = client.leaf_search(retry_request).await; - response_res = merge_leaf_search_results(response_res, retry_result); - } + return response_res; + }; + // There could be more than one split in the retry request. We pick a single client + // arbitrarily only considering the affinity of the first split. + client = retry_client( + &self.search_job_placer, + client.grpc_addr(), + &first_split.split_id, + ) + .await?; + debug!( + "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", + response_res, retry_request, client + ); + let retry_result = client.leaf_search(retry_request).await; + response_res = merge_original_with_retry_leaf_search_results(response_res, retry_result); response_res } @@ -274,16 +289,24 @@ fn merge_intermediate_aggregation(left: &[u8], right: &[u8]) -> crate::Result crate::Result { - left_response + original_response .partial_hits - .extend(right_response.partial_hits); + .extend(retry_response.partial_hits); let intermediate_aggregation_result: Option> = match ( - left_response.intermediate_aggregation_result, - right_response.intermediate_aggregation_result, + original_response.intermediate_aggregation_result, + retry_response.intermediate_aggregation_result, ) { (Some(left_agg_bytes), Some(right_agg_bytes)) => { let intermediate_aggregation_bytes: Vec = @@ -296,22 +319,24 @@ fn merge_leaf_search_response( }; Ok(LeafSearchResponse { intermediate_aggregation_result, - num_hits: left_response.num_hits + right_response.num_hits, - num_attempted_splits: left_response.num_attempted_splits - + right_response.num_attempted_splits, - failed_splits: right_response.failed_splits, - partial_hits: left_response.partial_hits, + num_hits: original_response.num_hits + retry_response.num_hits, + num_attempted_splits: original_response.num_attempted_splits + + retry_response.num_attempted_splits, + failed_splits: retry_response.failed_splits, + partial_hits: original_response.partial_hits, + num_successful_splits: original_response.num_successful_splits + + retry_response.num_successful_splits, }) } // Merge initial leaf search results with results obtained from a retry. -fn merge_leaf_search_results( +fn merge_original_with_retry_leaf_search_results( left_search_response_result: crate::Result, right_search_response_result: crate::Result, ) -> crate::Result { match (left_search_response_result, right_search_response_result) { (Ok(left_response), Ok(right_response)) => { - merge_leaf_search_response(left_response, right_response) + merge_original_with_retry_leaf_search_response(left_response, right_response) } (Ok(single_valid_response), Err(_)) => Ok(single_valid_response), (Err(_), Ok(single_valid_response)) => Ok(single_valid_response), @@ -626,8 +651,11 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_leaf_search_response = - merge_leaf_search_results(Ok(leaf_response), Ok(leaf_response_retry)).unwrap(); + let merged_leaf_search_response = merge_original_with_retry_leaf_search_results( + Ok(leaf_response), + Ok(leaf_response_retry), + ) + .unwrap(); assert_eq!(merged_leaf_search_response.num_attempted_splits, 2); assert_eq!(merged_leaf_search_response.num_hits, 2); assert_eq!(merged_leaf_search_response.partial_hits.len(), 2); @@ -649,7 +677,7 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_result = merge_leaf_search_results( + let merged_result = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Ok(leaf_response), ) @@ -663,7 +691,7 @@ mod tests { #[test] fn test_merge_leaf_search_retry_error_on_error() -> anyhow::Result<()> { - let merge_error = merge_leaf_search_results( + let merge_error = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Err(SearchError::Internal("retry error".to_string())), ) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index afeb2e0a4b9..221bcdb1392 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -592,6 +592,7 @@ impl SegmentCollector for QuickwitSegmentCollector { partial_hits, failed_splits: Vec::new(), num_attempted_splits: 1, + num_successful_splits: 1, }) } } @@ -608,7 +609,8 @@ pub enum QuickwitAggregations { } impl QuickwitAggregations { - fn fast_field_names(&self) -> HashSet { + /// Returns the list of fast fields that should be loaded for the aggregation. + pub fn fast_field_names(&self) -> HashSet { match self { QuickwitAggregations::FindTraceIdsAggregation(collector) => { collector.fast_field_names() @@ -927,6 +929,10 @@ fn merge_leaf_responses( .iter() .map(|leaf_response| leaf_response.num_attempted_splits) .sum(); + let num_successful_splits = leaf_responses + .iter() + .map(|leaf_response| leaf_response.num_successful_splits) + .sum::(); let num_hits: u64 = leaf_responses .iter() .map(|leaf_response| leaf_response.num_hits) @@ -952,6 +958,7 @@ fn merge_leaf_responses( partial_hits: top_k_partial_hits, failed_splits, num_attempted_splits, + num_successful_splits, }) } @@ -1173,6 +1180,7 @@ pub(crate) struct IncrementalCollector { num_hits: u64, failed_splits: Vec, num_attempted_splits: u64, + num_successful_splits: u64, start_offset: usize, } @@ -1193,6 +1201,7 @@ impl IncrementalCollector { num_hits: 0, failed_splits: Vec::new(), num_attempted_splits: 0, + num_successful_splits: 0, } } @@ -1204,12 +1213,14 @@ impl IncrementalCollector { failed_splits, num_attempted_splits, intermediate_aggregation_result, + num_successful_splits, } = leaf_response; self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); self.failed_splits.extend(failed_splits); self.num_attempted_splits += num_attempted_splits; + self.num_successful_splits += num_successful_splits; if let Some(intermediate_aggregation_result) = intermediate_aggregation_result { self.incremental_aggregation .add(intermediate_aggregation_result)?; @@ -1252,6 +1263,7 @@ impl IncrementalCollector { partial_hits, failed_splits: self.failed_splits, num_attempted_splits: self.num_attempted_splits, + num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, }) } @@ -1814,6 +1826,7 @@ mod tests { }], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }], ); @@ -1831,6 +1844,7 @@ mod tests { }], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None } ); @@ -1868,6 +1882,7 @@ mod tests { ], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }, LeafSearchResponse { @@ -1885,6 +1900,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 2, + num_successful_splits: 1, intermediate_aggregation_result: None, }, ], @@ -1916,6 +1932,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 5, + num_successful_splits: 4, intermediate_aggregation_result: None } ); @@ -1954,6 +1971,7 @@ mod tests { ], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }, LeafSearchResponse { @@ -1971,6 +1989,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 2, + num_successful_splits: 1, intermediate_aggregation_result: None, }, ], @@ -2002,6 +2021,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 5, + num_successful_splits: 4, intermediate_aggregation_result: None } ); diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 17f44f49e1e..76671545e71 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -17,10 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use itertools::Itertools; use quickwit_common::rate_limited_error; use quickwit_doc_mapper::QueryParserError; use quickwit_proto::error::grpc_error_to_grpc_status; use quickwit_proto::metastore::{EntityKind, MetastoreError}; +use quickwit_proto::search::SplitSearchError; use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; use quickwit_storage::StorageResolverError; use serde::{Deserialize, Serialize}; @@ -53,6 +55,23 @@ pub enum SearchError { Unavailable(String), } +impl SearchError { + /// Creates an internal `SearchError` from a list of split search errors. + pub fn from_split_errors(failed_splits: &[SplitSearchError]) -> Option { + let first_failing_split = failed_splits.first()?; + let failed_splits = failed_splits + .iter() + .map(|failed_split| &failed_split.split_id) + .join(", "); + let error_msg = format!( + "search failed for the following splits: {failed_splits:}. For instance, split {} \ + failed with the following error message: {}", + first_failing_split.split_id, first_failing_split.error, + ); + Some(SearchError::Internal(error_msg)) + } +} + impl ServiceError for SearchError { fn error_code(&self) -> ServiceErrorCode { match self { diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index b73c7b5637b..22e4b5551d1 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -338,6 +338,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { partial_hits: Vec::new(), failed_splits: Vec::new(), num_attempted_splits: 1, + num_successful_splits: 1, intermediate_aggregation_result: None, } } diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 86c5496240e..491f66f3aee 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -242,7 +242,8 @@ mod tests { let result = LeafSearchResponse { failed_splits: Vec::new(), intermediate_aggregation_result: None, - num_attempted_splits: 0, + num_attempted_splits: 1, + num_successful_splits: 1, num_hits: 1234, partial_hits: vec![PartialHit { doc_id: 1, @@ -331,7 +332,8 @@ mod tests { let result = LeafSearchResponse { failed_splits: Vec::new(), intermediate_aggregation_result: None, - num_attempted_splits: 0, + num_attempted_splits: 1, + num_successful_splits: 1, num_hits: 1234, partial_hits: vec![PartialHit { doc_id: 1, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 63caa3e7dc1..3565f9f68f8 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -47,9 +47,9 @@ use serde::{Deserialize, Serialize}; use tantivy::aggregation::agg_result::AggregationResults; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::collector::Collector; -use tantivy::schema::{FieldEntry, FieldType, Schema}; +use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, error, info, info_span, instrument}; +use tracing::{debug, info, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -473,6 +473,27 @@ fn validate_sort_by_field_type( Ok(()) } +fn check_is_fast_field( + schema: &Schema, + fast_field_name: &str, + dynamic_fast_field: Option, +) -> crate::Result<()> { + let Some((field, _path)): Option<(Field, &str)> = + schema.find_field_with_default(fast_field_name, dynamic_fast_field) + else { + return Err(SearchError::InvalidArgument(format!( + "Field \"{fast_field_name}\" does not exist" + ))); + }; + let field_entry: &FieldEntry = schema.get_field_entry(field); + if !field_entry.is_fast() { + return Err(SearchError::InvalidArgument(format!( + "Field \"{fast_field_name}\" is not configured as a fast field" + ))); + } + Ok(()) +} + fn validate_request( schema: &Schema, timestamp_field_name: &Option<&str>, @@ -491,11 +512,18 @@ fn validate_request( validate_requested_snippet_fields(schema, &search_request.snippet_fields)?; if let Some(agg) = search_request.aggregation_request.as_ref() { - let _aggs: QuickwitAggregations = serde_json::from_str(agg).map_err(|_err| { + let aggs: QuickwitAggregations = serde_json::from_str(agg).map_err(|_err| { let err = serde_json::from_str::(agg) .unwrap_err(); SearchError::InvalidAggregationRequest(err.to_string()) })?; + + // ensure that the required fast fields are indeed configured as fast fields. + let fast_field_names = aggs.fast_field_names(); + let dynamic_field = schema.get_field(DYNAMIC_FIELD_NAME).ok(); + for fast_field_name in &fast_field_names { + check_is_fast_field(schema, fast_field_name, dynamic_field)?; + } }; if search_request.start_offset > 10_000 { @@ -575,6 +603,8 @@ async fn search_partial_hits_phase_with_scroll( max_hits_per_page: max_hits, cached_partial_hits_start_offset: search_request.start_offset, cached_partial_hits, + failed_splits: leaf_search_resp.failed_splits.clone(), + num_successful_splits: leaf_search_resp.num_successful_splits, }; let scroll_key_and_start_offset: ScrollKeyAndStartOffset = ScrollKeyAndStartOffset::new_with_start_offset( @@ -650,11 +680,14 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec> = + let leaf_search_results: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); let leaf_search_response = crate::search_thread_pool() .run_cpu_intensive(move || { let _span_guard = span.enter(); - merge_collector.merge_fruits(leaf_search_responses) + merge_collector.merge_fruits(leaf_search_results) }) .await .context("failed to merge leaf search responses")? @@ -711,9 +744,7 @@ pub(crate) async fn search_partial_hits_phase( "Merged leaf search response." ); if !leaf_search_response.failed_splits.is_empty() { - error!(failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); - let errors: String = leaf_search_response.failed_splits.iter().join(", "); - return Err(SearchError::Internal(errors)); + quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } Ok(leaf_search_response) } @@ -931,6 +962,8 @@ async fn root_search_aux( scroll_id: scroll_key_and_start_offset_opt .as_ref() .map(ToString::to_string), + failed_splits: first_phase_result.failed_splits, + num_successful_splits: first_phase_result.num_successful_splits, }) } @@ -3487,7 +3520,7 @@ mod tests { } #[tokio::test] - async fn test_root_search_single_split_retry_single_node_fails() -> anyhow::Result<()> { + async fn test_root_search_single_split_retry_single_node_fails() { let search_request = quickwit_proto::search::SearchRequest { index_id_patterns: vec!["test-index".to_string()], query_ast: qast_json_helper("test", &["body"]), @@ -3544,9 +3577,9 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) - .await; - assert!(search_response.is_err()); - Ok(()) + .await + .unwrap(); + assert_eq!(search_response.failed_splits.len(), 1); } #[tokio::test] @@ -4766,12 +4799,17 @@ mod tests { .map(|split_offset| mock_partial_hit(&split_offset.split_id, 3, 1)) .collect_vec(); partial_hits.extend_from_slice(&partial_hits2); + let num_attempted_splits: u64 = leaf_search_req + .leaf_requests + .iter() + .map(|leaf_req| leaf_req.split_offsets.len() as u64) + .sum::(); Ok(quickwit_proto::search::LeafSearchResponse { num_hits: leaf_search_req.leaf_requests[0].split_offsets.len() as u64 + leaf_search_req.leaf_requests[1].split_offsets.len() as u64, partial_hits, failed_splits: Vec::new(), - num_attempted_splits: 1, + num_attempted_splits, ..Default::default() }) }, @@ -4813,4 +4851,118 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_root_search_split_failures() -> anyhow::Result<()> { + let search_request = quickwit_proto::search::SearchRequest { + index_id_patterns: vec!["test-index-1".to_string()], + query_ast: qast_json_helper("test", &["body"]), + max_hits: 10, + ..Default::default() + }; + let mut mock_metastore = MockMetastoreService::new(); + let index_metadata_1 = IndexMetadata::for_test("test-index-1", "ram:///test-index-1"); + let index_uid_1 = index_metadata_1.index_uid.clone(); + mock_metastore.expect_list_indexes_metadata().return_once( + move |_list_indexes_metadata_request: ListIndexesMetadataRequest| { + Ok(ListIndexesMetadataResponse::for_test(vec![ + index_metadata_1, + ])) + }, + ); + mock_metastore + .expect_list_splits() + .return_once(move |list_splits_request| { + let list_splits_query = + list_splits_request.deserialize_list_splits_query().unwrap(); + assert!(list_splits_query.index_uids == vec![index_uid_1.clone(),]); + let splits = vec![ + MockSplitBuilder::new("index-1-split-1") + .with_index_uid(&index_uid_1) + .build(), + MockSplitBuilder::new("index-1-split-2") + .with_index_uid(&index_uid_1) + .build(), + ]; + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) + }); + let mut mock_search_service_1 = MockSearchService::new(); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 2 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + let partial_hits = vec![mock_partial_hit("index-1-split-1", 0u64, 1u32)]; + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 1, + partial_hits, + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 3, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 1 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 0, + partial_hits: Vec::new(), + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 1, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_fetch_docs() + .times(1) + .returning(|fetch_docs_req| { + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_docs_req), + }) + }); + let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service_1)]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer.clone()); + let search_response = root_search( + &SearcherContext::for_test(), + search_request, + MetastoreServiceClient::from_mock(mock_metastore), + &cluster_client, + ) + .await + .unwrap(); + assert_eq!(search_response.num_hits, 1); + assert_eq!(search_response.hits.len(), 1); + assert_eq!(search_response.failed_splits.len(), 1); + Ok(()) + } } diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index bb21cf6db9b..c62e80084e4 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -28,7 +28,7 @@ use anyhow::Context; use base64::prelude::BASE64_STANDARD; use base64::Engine; use quickwit_metastore::SplitMetadata; -use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest}; +use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; use quickwit_proto::types::IndexUid; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -57,6 +57,8 @@ pub(crate) struct ScrollContext { pub max_hits_per_page: u64, pub cached_partial_hits_start_offset: u64, pub cached_partial_hits: Vec, + pub failed_splits: Vec, + pub num_successful_splits: u64, } impl ScrollContext { diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 2a57a7ab65b..65516a99e76 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -436,6 +436,8 @@ pub(crate) async fn scroll( scroll_id: Some(next_scroll_id.to_string()), errors: Vec::new(), aggregation: None, + failed_splits: scroll_context.failed_splits, + num_successful_splits: scroll_context.num_successful_splits, }) } /// [`SearcherContext`] provides a common set of variables diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 7f313558b50..ea255e9688d 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1461,10 +1461,10 @@ async fn test_single_node_aggregation_missing_fast_field() { ) .await .unwrap_err(); - let SearchError::Internal(error_msg) = single_node_error else { + let SearchError::InvalidArgument(error_msg) = single_node_error else { panic!(); }; - assert!(error_msg.contains("Field \"color\" is not configured as fast field")); + assert!(error_msg.contains("Field \"color\" is not configured as a fast field")); test_sandbox.assert_quit().await; } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs index 70409754353..903dfd4ed9c 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs @@ -287,6 +287,11 @@ impl SearchQueryParams { })?; Ok(Some(duration)) } + + pub fn allow_partial_search_results(&self) -> bool { + // By default, elastic search allows partial results. + self.allow_partial_search_results.unwrap_or(true) + } } #[doc = "Whether to expand wildcard expression to concrete indices that are open, closed or both."] diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index e5caa8703bc..433dec8f487 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -24,7 +24,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use elasticsearch_dsl::search::{Hit as ElasticHit, SearchResponse as ElasticsearchResponse}; -use elasticsearch_dsl::{HitsMetadata, Source, TotalHits, TotalHitsRelation}; +use elasticsearch_dsl::{HitsMetadata, ShardStatistics, Source, TotalHits, TotalHitsRelation}; use futures_util::StreamExt; use hyper::StatusCode; use itertools::Itertools; @@ -441,9 +441,16 @@ async fn es_compat_index_search( search_body: SearchBody, search_service: Arc, ) -> Result { + if search_params.scroll.is_some() && !search_params.allow_partial_search_results() { + return Err(ElasticsearchError::from(SearchError::InvalidArgument( + "Quickwit only supports scroll API with allow_partial_search_results set to true" + .to_string(), + ))); + } let _source_excludes = search_params._source_excludes.clone(); let _source_includes = search_params._source_includes.clone(); let start_instant = Instant::now(); + let allow_partial_search_results = search_params.allow_partial_search_results(); let (search_request, append_shard_doc) = build_request_for_es_api(index_id_patterns, search_params, search_body)?; let search_response: SearchResponse = search_service.root_search(search_request).await?; @@ -453,7 +460,8 @@ async fn es_compat_index_search( append_shard_doc, _source_excludes, _source_includes, - ); + allow_partial_search_results, + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok(search_response_rest) } @@ -791,6 +799,7 @@ async fn es_compat_index_multi_search( build_request_for_es_api(index_ids_patterns, search_query_params, search_body)?; search_requests.push(es_request); } + // TODO: forced to do weird referencing to work around https://github.com/rust-lang/rust/issues/100905 // otherwise append_shard_doc is captured by ref, and we get lifetime issues let futures = search_requests @@ -804,12 +813,14 @@ async fn es_compat_index_multi_search( let search_response: SearchResponse = search_service.clone().root_search(search_request).await?; let elapsed = start_instant.elapsed(); - let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response( - search_response, - append_shard_doc, - _source_excludes, - _source_includes, - ); + let mut search_response_rest: ElasticsearchResponse = + convert_to_es_search_response( + search_response, + append_shard_doc, + _source_excludes, + _source_includes, + true, //< allow_partial_results. Set to to true to match ES's behavior. + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok::<_, ElasticsearchError>(search_response_rest) } @@ -852,8 +863,13 @@ async fn es_scroll( }; let search_response: SearchResponse = search_service.scroll(scroll_request).await?; // TODO append_shard_doc depends on the initial request, but we don't have access to it + + // Ideally, we would have wanted to reuse the setting from the initial search request. + // However, passing that parameter is cumbersome, so we cut some corner and forbid the + // use of scroll requests in combination with allow_partial_results set to false. + let allow_failed_splits = true; let mut search_response_rest: ElasticsearchResponse = - convert_to_es_search_response(search_response, false, None, None); + convert_to_es_search_response(search_response, false, None, None, allow_failed_splits)?; search_response_rest.took = start_instant.elapsed().as_millis() as u32; Ok(search_response_rest) } @@ -918,7 +934,13 @@ fn convert_to_es_search_response( append_shard_doc: bool, _source_excludes: Option>, _source_includes: Option>, -) -> ElasticsearchResponse { + allow_partial_results: bool, +) -> Result { + if !allow_partial_results || resp.num_successful_splits == 0 { + if let Some(search_error) = SearchError::from_split_errors(&resp.failed_splits) { + return Err(ElasticsearchError::from(search_error)); + } + } let hits: Vec = resp .hits .into_iter() @@ -929,7 +951,10 @@ fn convert_to_es_search_response( } else { None }; - ElasticsearchResponse { + let num_failed_splits = resp.failed_splits.len() as u32; + let num_successful_splits = resp.num_successful_splits as u32; + let num_total_splits = num_successful_splits + num_failed_splits; + Ok(ElasticsearchResponse { timed_out: false, hits: HitsMetadata { total: Some(TotalHits { @@ -941,8 +966,16 @@ fn convert_to_es_search_response( }, aggregations, scroll_id: resp.scroll_id, + // There is not concept of shards here, but use this to convey split search failures. + shards: ShardStatistics { + total: num_total_splits, + successful: num_successful_splits, + skipped: 0u32, + failed: num_failed_splits, + failures: Vec::new(), + }, ..Default::default() - } + }) } pub(crate) fn str_lines(body: &str) -> impl Iterator { @@ -954,6 +987,7 @@ pub(crate) fn str_lines(body: &str) -> impl Iterator { #[cfg(test)] mod tests { use hyper::StatusCode; + use quickwit_proto::search::SplitSearchError; use super::{partial_hit_from_search_after_param, *}; @@ -1133,4 +1167,58 @@ mod tests { assert_eq!(fields, expected); } + + // We test that the behavior of allow partial search results. + #[test] + fn test_convert_to_es_search_response_allow_partial() { + let split_error = SplitSearchError { + error: "some-error".to_string(), + split_id: "some-split-id".to_string(), + retryable_error: true, + }; + { + let search_response = SearchResponse { + num_successful_splits: 1, + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + convert_to_es_search_response(search_response, false, None, None, false).unwrap_err(); + } + { + let search_response = SearchResponse { + num_successful_splits: 1, + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + // if we allow partial search results, this should not fail, but we report the presence + // of failed splits in the fail shard response. + let es_search_resp = + convert_to_es_search_response(search_response, false, None, None, true).unwrap(); + assert_eq!(es_search_resp.shards.failed, 1); + } + { + let search_response = SearchResponse { + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + // Event if we allow partial search results, with a fail and no success, we have a + // failure. + convert_to_es_search_response(search_response, false, None, None, true).unwrap_err(); + } + { + // Not having any splits (no failure + no success) is not considered a failure. + for allow_partial in [true, false] { + let search_response = SearchResponse::default(); + let es_search_resp = convert_to_es_search_response( + search_response, + false, + None, + None, + allow_partial, + ) + .unwrap(); + assert_eq!(es_search_resp.shards.failed, 0); + } + } + } } diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 1525c4d2b8b..15d76413f47 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -463,6 +463,8 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), + num_successful_splits: 1, }) }); let mock_search_service = Arc::new(mock_search_service); @@ -494,6 +496,8 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), + num_successful_splits: 1, }) }); let mock_search_service = Arc::new(mock_search_service); diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index cf9b5d40c84..42a9a0ff44b 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -239,6 +239,10 @@ pub struct SearchRequestQueryString { #[serde(with = "count_hits_from_bool")] #[serde(default = "count_hits_from_bool::default")] pub count_all: CountHits, + #[param(value_type = bool)] + #[schema(value_type = bool)] + #[serde(default)] + pub allow_failed_splits: bool, } mod count_hits_from_bool { @@ -302,8 +306,22 @@ async fn search_endpoint( search_request: SearchRequestQueryString, search_service: &dyn SearchService, ) -> Result { + let allow_failed_splits = search_request.allow_failed_splits; let search_request = search_request_from_api_request(index_id_patterns, search_request)?; - let search_response = search_service.root_search(search_request).await?; + let search_response = + search_service + .root_search(search_request) + .await + .and_then(|search_response| { + if !allow_failed_splits || search_response.num_successful_splits == 0 { + if let Some(search_error) = + SearchError::from_split_errors(&search_response.failed_splits[..]) + { + return Err(search_error); + } + } + Ok(search_response) + })?; let search_response_rest = SearchResponseRest::try_from(search_response)?; Ok(search_response_rest) } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml index 58e425f07e5..0f6b845b7a0 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml @@ -1,3 +1,17 @@ +--- +engines: ["quickwit"] +params: + size: 1 + scroll: 30m + allow_partial_search_results: "false" +json: + query: + match_all: {} +status_code: 400 +expected: + error: + reason: "Invalid argument: Quickwit only supports scroll API with allow_partial_search_results set to true" +--- params: size: 1 scroll: 30m