diff --git a/docs/reference/es_compatible_api.md b/docs/reference/es_compatible_api.md index 99b4a08c2aa..f1fffa587a1 100644 --- a/docs/reference/es_compatible_api.md +++ b/docs/reference/es_compatible_api.md @@ -135,6 +135,7 @@ If a parameter appears both as a query string parameter and in the JSON payload, | `size` | `Integer` | Number of hits to return. | 10 | | `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) | | `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) | +| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` | #### Supported Request Body parameters @@ -301,7 +302,7 @@ GET api/v1/_elastic/_cat/indices Use the [cat indices API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html) to get the following information for each index in a cluster: * Shard count -* Document count +* Document count * Deleted document count * Primary store size * Total store size diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c7ab1911205..db22ff1743e 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { format: BodyFormat::Json, sort_by, count_all: CountHits::CountAll, + allow_failed_splits: false, }; let search_request = search_request_from_api_request(vec![args.index_id], search_request_query_string)?; diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 50ad8aec3c1..60671239ecc 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -126,14 +126,14 @@ message ListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 2; - + // Time filter, expressed in seconds since epoch. // That filter is to be interpreted as the semi-open interval: // [start_timestamp, end_timestamp). optional int64 start_timestamp = 3; optional int64 end_timestamp = 4; - // Control if the the request will fail if split_ids contains a split that does not exist. + // Control if the the request will fail if split_ids contains a split that does not exist. // optional bool fail_on_missing_index = 6; } @@ -149,7 +149,7 @@ message LeafListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 4; - + } message ListFieldsResponse { @@ -299,6 +299,17 @@ message SearchResponse { // Scroll Id (only set if scroll_secs was set in the request) optional string scroll_id = 6; + + // Returns the list of splits for which search failed. + // For the moment, the cause is unknown. + // + // It is up to the caller to decide whether to interpret + // this as an overall failure or to present the partial results + // to the end user. + repeated SplitSearchError failed_splits = 7; + + // Total number of successful splits searched. + uint64 num_successful_splits = 8; } message SearchPlanResponse { @@ -340,7 +351,7 @@ message LeafSearchRequest { message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` uint32 doc_mapper_ord = 1; - + // The ordinal of the index uri in LeafSearchRequest.index_uris uint32 index_uri_ord = 2; @@ -453,10 +464,16 @@ message LeafSearchResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of attempt to search into splits. + // We do have: + // `num_splits_requested == num_successful_splits + num_failed_splits.len()` + // But we do not necessarily have: + // `num_splits_requested = num_attempted_splits because of retries.` uint64 num_attempted_splits = 4; + // Total number of successful splits searched. + uint64 num_successful_splits = 7; + // Deprecated json serialized intermediate aggregation_result. reserved 5; @@ -550,8 +567,7 @@ message LeafListTermsResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of single split search attempted. uint64 num_attempted_splits = 4; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index ed0219d0a7f..189019162f8 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -230,6 +230,17 @@ pub struct SearchResponse { /// Scroll Id (only set if scroll_secs was set in the request) #[prost(string, optional, tag = "6")] pub scroll_id: ::core::option::Option<::prost::alloc::string::String>, + /// Returns the list of splits for which search failed. + /// For the moment, the cause is unknown. + /// + /// It is up to the caller to decide whether to interpret + /// this as an overall failure or to present the partial results + /// to the end user. + #[prost(message, repeated, tag = "7")] + pub failed_splits: ::prost::alloc::vec::Vec, + /// Total number of successful splits searched. + #[prost(uint64, tag = "8")] + pub num_successful_splits: u64, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -431,10 +442,16 @@ pub struct LeafSearchResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of attempt to search into splits. + /// We do have: + /// `num_splits_requested == num_successful_splits + num_failed_splits.len()` + /// But we do not necessarily have: + /// `num_splits_requested = num_attempted_splits because of retries.` #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, + /// Total number of successful splits searched. + #[prost(uint64, tag = "7")] + pub num_successful_splits: u64, /// postcard serialized intermediate aggregation_result. #[prost(bytes = "vec", optional, tag = "6")] pub intermediate_aggregation_result: ::core::option::Option< @@ -551,8 +568,7 @@ pub struct LeafListTermsResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of single split search attempted. #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, } diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 25d53ca7554..d32ad92327c 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -94,21 +94,36 @@ impl ClusterClient { ) -> crate::Result { let mut response_res = client.leaf_search(request.clone()).await; let retry_policy = LeafSearchRetryPolicy {}; - if let Some(retry_request) = retry_policy.retry_request(request, &response_res) { - assert!(!retry_request.leaf_requests.is_empty()); - client = retry_client( - &self.search_job_placer, - client.grpc_addr(), - &retry_request.leaf_requests[0].split_offsets[0].split_id, - ) - .await?; - debug!( - "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", - response_res, retry_request, client + // We retry only once. + let Some(retry_request) = retry_policy.retry_request(request, &response_res) else { + return response_res; + }; + let Some(first_split) = retry_request + .leaf_requests + .iter() + .flat_map(|leaf_req| leaf_req.split_offsets.iter()) + .next() + else { + warn!( + "the retry request did not contain any split to retry. this should never happen, \ + please report" ); - let retry_result = client.leaf_search(retry_request).await; - response_res = merge_leaf_search_results(response_res, retry_result); - } + return response_res; + }; + // There could be more than one split in the retry request. We pick a single client + // arbitrarily only considering the affinity of the first split. + client = retry_client( + &self.search_job_placer, + client.grpc_addr(), + &first_split.split_id, + ) + .await?; + debug!( + "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", + response_res, retry_request, client + ); + let retry_result = client.leaf_search(retry_request).await; + response_res = merge_original_with_retry_leaf_search_results(response_res, retry_result); response_res } @@ -274,16 +289,24 @@ fn merge_intermediate_aggregation(left: &[u8], right: &[u8]) -> crate::Result crate::Result { - left_response + original_response .partial_hits - .extend(right_response.partial_hits); + .extend(retry_response.partial_hits); let intermediate_aggregation_result: Option> = match ( - left_response.intermediate_aggregation_result, - right_response.intermediate_aggregation_result, + original_response.intermediate_aggregation_result, + retry_response.intermediate_aggregation_result, ) { (Some(left_agg_bytes), Some(right_agg_bytes)) => { let intermediate_aggregation_bytes: Vec = @@ -296,22 +319,24 @@ fn merge_leaf_search_response( }; Ok(LeafSearchResponse { intermediate_aggregation_result, - num_hits: left_response.num_hits + right_response.num_hits, - num_attempted_splits: left_response.num_attempted_splits - + right_response.num_attempted_splits, - failed_splits: right_response.failed_splits, - partial_hits: left_response.partial_hits, + num_hits: original_response.num_hits + retry_response.num_hits, + num_attempted_splits: original_response.num_attempted_splits + + retry_response.num_attempted_splits, + failed_splits: retry_response.failed_splits, + partial_hits: original_response.partial_hits, + num_successful_splits: original_response.num_successful_splits + + retry_response.num_successful_splits, }) } // Merge initial leaf search results with results obtained from a retry. -fn merge_leaf_search_results( +fn merge_original_with_retry_leaf_search_results( left_search_response_result: crate::Result, right_search_response_result: crate::Result, ) -> crate::Result { match (left_search_response_result, right_search_response_result) { (Ok(left_response), Ok(right_response)) => { - merge_leaf_search_response(left_response, right_response) + merge_original_with_retry_leaf_search_response(left_response, right_response) } (Ok(single_valid_response), Err(_)) => Ok(single_valid_response), (Err(_), Ok(single_valid_response)) => Ok(single_valid_response), @@ -626,8 +651,11 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_leaf_search_response = - merge_leaf_search_results(Ok(leaf_response), Ok(leaf_response_retry)).unwrap(); + let merged_leaf_search_response = merge_original_with_retry_leaf_search_results( + Ok(leaf_response), + Ok(leaf_response_retry), + ) + .unwrap(); assert_eq!(merged_leaf_search_response.num_attempted_splits, 2); assert_eq!(merged_leaf_search_response.num_hits, 2); assert_eq!(merged_leaf_search_response.partial_hits.len(), 2); @@ -649,7 +677,7 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_result = merge_leaf_search_results( + let merged_result = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Ok(leaf_response), ) @@ -663,7 +691,7 @@ mod tests { #[test] fn test_merge_leaf_search_retry_error_on_error() -> anyhow::Result<()> { - let merge_error = merge_leaf_search_results( + let merge_error = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Err(SearchError::Internal("retry error".to_string())), ) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index afeb2e0a4b9..221bcdb1392 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -592,6 +592,7 @@ impl SegmentCollector for QuickwitSegmentCollector { partial_hits, failed_splits: Vec::new(), num_attempted_splits: 1, + num_successful_splits: 1, }) } } @@ -608,7 +609,8 @@ pub enum QuickwitAggregations { } impl QuickwitAggregations { - fn fast_field_names(&self) -> HashSet { + /// Returns the list of fast fields that should be loaded for the aggregation. + pub fn fast_field_names(&self) -> HashSet { match self { QuickwitAggregations::FindTraceIdsAggregation(collector) => { collector.fast_field_names() @@ -927,6 +929,10 @@ fn merge_leaf_responses( .iter() .map(|leaf_response| leaf_response.num_attempted_splits) .sum(); + let num_successful_splits = leaf_responses + .iter() + .map(|leaf_response| leaf_response.num_successful_splits) + .sum::(); let num_hits: u64 = leaf_responses .iter() .map(|leaf_response| leaf_response.num_hits) @@ -952,6 +958,7 @@ fn merge_leaf_responses( partial_hits: top_k_partial_hits, failed_splits, num_attempted_splits, + num_successful_splits, }) } @@ -1173,6 +1180,7 @@ pub(crate) struct IncrementalCollector { num_hits: u64, failed_splits: Vec, num_attempted_splits: u64, + num_successful_splits: u64, start_offset: usize, } @@ -1193,6 +1201,7 @@ impl IncrementalCollector { num_hits: 0, failed_splits: Vec::new(), num_attempted_splits: 0, + num_successful_splits: 0, } } @@ -1204,12 +1213,14 @@ impl IncrementalCollector { failed_splits, num_attempted_splits, intermediate_aggregation_result, + num_successful_splits, } = leaf_response; self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); self.failed_splits.extend(failed_splits); self.num_attempted_splits += num_attempted_splits; + self.num_successful_splits += num_successful_splits; if let Some(intermediate_aggregation_result) = intermediate_aggregation_result { self.incremental_aggregation .add(intermediate_aggregation_result)?; @@ -1252,6 +1263,7 @@ impl IncrementalCollector { partial_hits, failed_splits: self.failed_splits, num_attempted_splits: self.num_attempted_splits, + num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, }) } @@ -1814,6 +1826,7 @@ mod tests { }], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }], ); @@ -1831,6 +1844,7 @@ mod tests { }], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None } ); @@ -1868,6 +1882,7 @@ mod tests { ], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }, LeafSearchResponse { @@ -1885,6 +1900,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 2, + num_successful_splits: 1, intermediate_aggregation_result: None, }, ], @@ -1916,6 +1932,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 5, + num_successful_splits: 4, intermediate_aggregation_result: None } ); @@ -1954,6 +1971,7 @@ mod tests { ], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }, LeafSearchResponse { @@ -1971,6 +1989,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 2, + num_successful_splits: 1, intermediate_aggregation_result: None, }, ], @@ -2002,6 +2021,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 5, + num_successful_splits: 4, intermediate_aggregation_result: None } ); diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 17f44f49e1e..76671545e71 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -17,10 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use itertools::Itertools; use quickwit_common::rate_limited_error; use quickwit_doc_mapper::QueryParserError; use quickwit_proto::error::grpc_error_to_grpc_status; use quickwit_proto::metastore::{EntityKind, MetastoreError}; +use quickwit_proto::search::SplitSearchError; use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; use quickwit_storage::StorageResolverError; use serde::{Deserialize, Serialize}; @@ -53,6 +55,23 @@ pub enum SearchError { Unavailable(String), } +impl SearchError { + /// Creates an internal `SearchError` from a list of split search errors. + pub fn from_split_errors(failed_splits: &[SplitSearchError]) -> Option { + let first_failing_split = failed_splits.first()?; + let failed_splits = failed_splits + .iter() + .map(|failed_split| &failed_split.split_id) + .join(", "); + let error_msg = format!( + "search failed for the following splits: {failed_splits:}. For instance, split {} \ + failed with the following error message: {}", + first_failing_split.split_id, first_failing_split.error, + ); + Some(SearchError::Internal(error_msg)) + } +} + impl ServiceError for SearchError { fn error_code(&self) -> ServiceErrorCode { match self { diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index b73c7b5637b..22e4b5551d1 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -338,6 +338,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { partial_hits: Vec::new(), failed_splits: Vec::new(), num_attempted_splits: 1, + num_successful_splits: 1, intermediate_aggregation_result: None, } } diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 86c5496240e..491f66f3aee 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -242,7 +242,8 @@ mod tests { let result = LeafSearchResponse { failed_splits: Vec::new(), intermediate_aggregation_result: None, - num_attempted_splits: 0, + num_attempted_splits: 1, + num_successful_splits: 1, num_hits: 1234, partial_hits: vec![PartialHit { doc_id: 1, @@ -331,7 +332,8 @@ mod tests { let result = LeafSearchResponse { failed_splits: Vec::new(), intermediate_aggregation_result: None, - num_attempted_splits: 0, + num_attempted_splits: 1, + num_successful_splits: 1, num_hits: 1234, partial_hits: vec![PartialHit { doc_id: 1, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 63caa3e7dc1..3565f9f68f8 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -47,9 +47,9 @@ use serde::{Deserialize, Serialize}; use tantivy::aggregation::agg_result::AggregationResults; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::collector::Collector; -use tantivy::schema::{FieldEntry, FieldType, Schema}; +use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, error, info, info_span, instrument}; +use tracing::{debug, info, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -473,6 +473,27 @@ fn validate_sort_by_field_type( Ok(()) } +fn check_is_fast_field( + schema: &Schema, + fast_field_name: &str, + dynamic_fast_field: Option, +) -> crate::Result<()> { + let Some((field, _path)): Option<(Field, &str)> = + schema.find_field_with_default(fast_field_name, dynamic_fast_field) + else { + return Err(SearchError::InvalidArgument(format!( + "Field \"{fast_field_name}\" does not exist" + ))); + }; + let field_entry: &FieldEntry = schema.get_field_entry(field); + if !field_entry.is_fast() { + return Err(SearchError::InvalidArgument(format!( + "Field \"{fast_field_name}\" is not configured as a fast field" + ))); + } + Ok(()) +} + fn validate_request( schema: &Schema, timestamp_field_name: &Option<&str>, @@ -491,11 +512,18 @@ fn validate_request( validate_requested_snippet_fields(schema, &search_request.snippet_fields)?; if let Some(agg) = search_request.aggregation_request.as_ref() { - let _aggs: QuickwitAggregations = serde_json::from_str(agg).map_err(|_err| { + let aggs: QuickwitAggregations = serde_json::from_str(agg).map_err(|_err| { let err = serde_json::from_str::(agg) .unwrap_err(); SearchError::InvalidAggregationRequest(err.to_string()) })?; + + // ensure that the required fast fields are indeed configured as fast fields. + let fast_field_names = aggs.fast_field_names(); + let dynamic_field = schema.get_field(DYNAMIC_FIELD_NAME).ok(); + for fast_field_name in &fast_field_names { + check_is_fast_field(schema, fast_field_name, dynamic_field)?; + } }; if search_request.start_offset > 10_000 { @@ -575,6 +603,8 @@ async fn search_partial_hits_phase_with_scroll( max_hits_per_page: max_hits, cached_partial_hits_start_offset: search_request.start_offset, cached_partial_hits, + failed_splits: leaf_search_resp.failed_splits.clone(), + num_successful_splits: leaf_search_resp.num_successful_splits, }; let scroll_key_and_start_offset: ScrollKeyAndStartOffset = ScrollKeyAndStartOffset::new_with_start_offset( @@ -650,11 +680,14 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec> = + let leaf_search_results: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); let leaf_search_response = crate::search_thread_pool() .run_cpu_intensive(move || { let _span_guard = span.enter(); - merge_collector.merge_fruits(leaf_search_responses) + merge_collector.merge_fruits(leaf_search_results) }) .await .context("failed to merge leaf search responses")? @@ -711,9 +744,7 @@ pub(crate) async fn search_partial_hits_phase( "Merged leaf search response." ); if !leaf_search_response.failed_splits.is_empty() { - error!(failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); - let errors: String = leaf_search_response.failed_splits.iter().join(", "); - return Err(SearchError::Internal(errors)); + quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } Ok(leaf_search_response) } @@ -931,6 +962,8 @@ async fn root_search_aux( scroll_id: scroll_key_and_start_offset_opt .as_ref() .map(ToString::to_string), + failed_splits: first_phase_result.failed_splits, + num_successful_splits: first_phase_result.num_successful_splits, }) } @@ -3487,7 +3520,7 @@ mod tests { } #[tokio::test] - async fn test_root_search_single_split_retry_single_node_fails() -> anyhow::Result<()> { + async fn test_root_search_single_split_retry_single_node_fails() { let search_request = quickwit_proto::search::SearchRequest { index_id_patterns: vec!["test-index".to_string()], query_ast: qast_json_helper("test", &["body"]), @@ -3544,9 +3577,9 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) - .await; - assert!(search_response.is_err()); - Ok(()) + .await + .unwrap(); + assert_eq!(search_response.failed_splits.len(), 1); } #[tokio::test] @@ -4766,12 +4799,17 @@ mod tests { .map(|split_offset| mock_partial_hit(&split_offset.split_id, 3, 1)) .collect_vec(); partial_hits.extend_from_slice(&partial_hits2); + let num_attempted_splits: u64 = leaf_search_req + .leaf_requests + .iter() + .map(|leaf_req| leaf_req.split_offsets.len() as u64) + .sum::(); Ok(quickwit_proto::search::LeafSearchResponse { num_hits: leaf_search_req.leaf_requests[0].split_offsets.len() as u64 + leaf_search_req.leaf_requests[1].split_offsets.len() as u64, partial_hits, failed_splits: Vec::new(), - num_attempted_splits: 1, + num_attempted_splits, ..Default::default() }) }, @@ -4813,4 +4851,118 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_root_search_split_failures() -> anyhow::Result<()> { + let search_request = quickwit_proto::search::SearchRequest { + index_id_patterns: vec!["test-index-1".to_string()], + query_ast: qast_json_helper("test", &["body"]), + max_hits: 10, + ..Default::default() + }; + let mut mock_metastore = MockMetastoreService::new(); + let index_metadata_1 = IndexMetadata::for_test("test-index-1", "ram:///test-index-1"); + let index_uid_1 = index_metadata_1.index_uid.clone(); + mock_metastore.expect_list_indexes_metadata().return_once( + move |_list_indexes_metadata_request: ListIndexesMetadataRequest| { + Ok(ListIndexesMetadataResponse::for_test(vec![ + index_metadata_1, + ])) + }, + ); + mock_metastore + .expect_list_splits() + .return_once(move |list_splits_request| { + let list_splits_query = + list_splits_request.deserialize_list_splits_query().unwrap(); + assert!(list_splits_query.index_uids == vec![index_uid_1.clone(),]); + let splits = vec![ + MockSplitBuilder::new("index-1-split-1") + .with_index_uid(&index_uid_1) + .build(), + MockSplitBuilder::new("index-1-split-2") + .with_index_uid(&index_uid_1) + .build(), + ]; + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) + }); + let mut mock_search_service_1 = MockSearchService::new(); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 2 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + let partial_hits = vec![mock_partial_hit("index-1-split-1", 0u64, 1u32)]; + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 1, + partial_hits, + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 3, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 1 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 0, + partial_hits: Vec::new(), + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 1, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_fetch_docs() + .times(1) + .returning(|fetch_docs_req| { + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_docs_req), + }) + }); + let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service_1)]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer.clone()); + let search_response = root_search( + &SearcherContext::for_test(), + search_request, + MetastoreServiceClient::from_mock(mock_metastore), + &cluster_client, + ) + .await + .unwrap(); + assert_eq!(search_response.num_hits, 1); + assert_eq!(search_response.hits.len(), 1); + assert_eq!(search_response.failed_splits.len(), 1); + Ok(()) + } } diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index bb21cf6db9b..c62e80084e4 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -28,7 +28,7 @@ use anyhow::Context; use base64::prelude::BASE64_STANDARD; use base64::Engine; use quickwit_metastore::SplitMetadata; -use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest}; +use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; use quickwit_proto::types::IndexUid; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -57,6 +57,8 @@ pub(crate) struct ScrollContext { pub max_hits_per_page: u64, pub cached_partial_hits_start_offset: u64, pub cached_partial_hits: Vec, + pub failed_splits: Vec, + pub num_successful_splits: u64, } impl ScrollContext { diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 2a57a7ab65b..65516a99e76 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -436,6 +436,8 @@ pub(crate) async fn scroll( scroll_id: Some(next_scroll_id.to_string()), errors: Vec::new(), aggregation: None, + failed_splits: scroll_context.failed_splits, + num_successful_splits: scroll_context.num_successful_splits, }) } /// [`SearcherContext`] provides a common set of variables diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 7f313558b50..ea255e9688d 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1461,10 +1461,10 @@ async fn test_single_node_aggregation_missing_fast_field() { ) .await .unwrap_err(); - let SearchError::Internal(error_msg) = single_node_error else { + let SearchError::InvalidArgument(error_msg) = single_node_error else { panic!(); }; - assert!(error_msg.contains("Field \"color\" is not configured as fast field")); + assert!(error_msg.contains("Field \"color\" is not configured as a fast field")); test_sandbox.assert_quit().await; } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs index 70409754353..903dfd4ed9c 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs @@ -287,6 +287,11 @@ impl SearchQueryParams { })?; Ok(Some(duration)) } + + pub fn allow_partial_search_results(&self) -> bool { + // By default, elastic search allows partial results. + self.allow_partial_search_results.unwrap_or(true) + } } #[doc = "Whether to expand wildcard expression to concrete indices that are open, closed or both."] diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index e5caa8703bc..433dec8f487 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -24,7 +24,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use elasticsearch_dsl::search::{Hit as ElasticHit, SearchResponse as ElasticsearchResponse}; -use elasticsearch_dsl::{HitsMetadata, Source, TotalHits, TotalHitsRelation}; +use elasticsearch_dsl::{HitsMetadata, ShardStatistics, Source, TotalHits, TotalHitsRelation}; use futures_util::StreamExt; use hyper::StatusCode; use itertools::Itertools; @@ -441,9 +441,16 @@ async fn es_compat_index_search( search_body: SearchBody, search_service: Arc, ) -> Result { + if search_params.scroll.is_some() && !search_params.allow_partial_search_results() { + return Err(ElasticsearchError::from(SearchError::InvalidArgument( + "Quickwit only supports scroll API with allow_partial_search_results set to true" + .to_string(), + ))); + } let _source_excludes = search_params._source_excludes.clone(); let _source_includes = search_params._source_includes.clone(); let start_instant = Instant::now(); + let allow_partial_search_results = search_params.allow_partial_search_results(); let (search_request, append_shard_doc) = build_request_for_es_api(index_id_patterns, search_params, search_body)?; let search_response: SearchResponse = search_service.root_search(search_request).await?; @@ -453,7 +460,8 @@ async fn es_compat_index_search( append_shard_doc, _source_excludes, _source_includes, - ); + allow_partial_search_results, + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok(search_response_rest) } @@ -791,6 +799,7 @@ async fn es_compat_index_multi_search( build_request_for_es_api(index_ids_patterns, search_query_params, search_body)?; search_requests.push(es_request); } + // TODO: forced to do weird referencing to work around https://github.com/rust-lang/rust/issues/100905 // otherwise append_shard_doc is captured by ref, and we get lifetime issues let futures = search_requests @@ -804,12 +813,14 @@ async fn es_compat_index_multi_search( let search_response: SearchResponse = search_service.clone().root_search(search_request).await?; let elapsed = start_instant.elapsed(); - let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response( - search_response, - append_shard_doc, - _source_excludes, - _source_includes, - ); + let mut search_response_rest: ElasticsearchResponse = + convert_to_es_search_response( + search_response, + append_shard_doc, + _source_excludes, + _source_includes, + true, //< allow_partial_results. Set to to true to match ES's behavior. + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok::<_, ElasticsearchError>(search_response_rest) } @@ -852,8 +863,13 @@ async fn es_scroll( }; let search_response: SearchResponse = search_service.scroll(scroll_request).await?; // TODO append_shard_doc depends on the initial request, but we don't have access to it + + // Ideally, we would have wanted to reuse the setting from the initial search request. + // However, passing that parameter is cumbersome, so we cut some corner and forbid the + // use of scroll requests in combination with allow_partial_results set to false. + let allow_failed_splits = true; let mut search_response_rest: ElasticsearchResponse = - convert_to_es_search_response(search_response, false, None, None); + convert_to_es_search_response(search_response, false, None, None, allow_failed_splits)?; search_response_rest.took = start_instant.elapsed().as_millis() as u32; Ok(search_response_rest) } @@ -918,7 +934,13 @@ fn convert_to_es_search_response( append_shard_doc: bool, _source_excludes: Option>, _source_includes: Option>, -) -> ElasticsearchResponse { + allow_partial_results: bool, +) -> Result { + if !allow_partial_results || resp.num_successful_splits == 0 { + if let Some(search_error) = SearchError::from_split_errors(&resp.failed_splits) { + return Err(ElasticsearchError::from(search_error)); + } + } let hits: Vec = resp .hits .into_iter() @@ -929,7 +951,10 @@ fn convert_to_es_search_response( } else { None }; - ElasticsearchResponse { + let num_failed_splits = resp.failed_splits.len() as u32; + let num_successful_splits = resp.num_successful_splits as u32; + let num_total_splits = num_successful_splits + num_failed_splits; + Ok(ElasticsearchResponse { timed_out: false, hits: HitsMetadata { total: Some(TotalHits { @@ -941,8 +966,16 @@ fn convert_to_es_search_response( }, aggregations, scroll_id: resp.scroll_id, + // There is not concept of shards here, but use this to convey split search failures. + shards: ShardStatistics { + total: num_total_splits, + successful: num_successful_splits, + skipped: 0u32, + failed: num_failed_splits, + failures: Vec::new(), + }, ..Default::default() - } + }) } pub(crate) fn str_lines(body: &str) -> impl Iterator { @@ -954,6 +987,7 @@ pub(crate) fn str_lines(body: &str) -> impl Iterator { #[cfg(test)] mod tests { use hyper::StatusCode; + use quickwit_proto::search::SplitSearchError; use super::{partial_hit_from_search_after_param, *}; @@ -1133,4 +1167,58 @@ mod tests { assert_eq!(fields, expected); } + + // We test that the behavior of allow partial search results. + #[test] + fn test_convert_to_es_search_response_allow_partial() { + let split_error = SplitSearchError { + error: "some-error".to_string(), + split_id: "some-split-id".to_string(), + retryable_error: true, + }; + { + let search_response = SearchResponse { + num_successful_splits: 1, + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + convert_to_es_search_response(search_response, false, None, None, false).unwrap_err(); + } + { + let search_response = SearchResponse { + num_successful_splits: 1, + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + // if we allow partial search results, this should not fail, but we report the presence + // of failed splits in the fail shard response. + let es_search_resp = + convert_to_es_search_response(search_response, false, None, None, true).unwrap(); + assert_eq!(es_search_resp.shards.failed, 1); + } + { + let search_response = SearchResponse { + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + // Event if we allow partial search results, with a fail and no success, we have a + // failure. + convert_to_es_search_response(search_response, false, None, None, true).unwrap_err(); + } + { + // Not having any splits (no failure + no success) is not considered a failure. + for allow_partial in [true, false] { + let search_response = SearchResponse::default(); + let es_search_resp = convert_to_es_search_response( + search_response, + false, + None, + None, + allow_partial, + ) + .unwrap(); + assert_eq!(es_search_resp.shards.failed, 0); + } + } + } } diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 1525c4d2b8b..15d76413f47 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -463,6 +463,8 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), + num_successful_splits: 1, }) }); let mock_search_service = Arc::new(mock_search_service); @@ -494,6 +496,8 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), + num_successful_splits: 1, }) }); let mock_search_service = Arc::new(mock_search_service); diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index cf9b5d40c84..42a9a0ff44b 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -239,6 +239,10 @@ pub struct SearchRequestQueryString { #[serde(with = "count_hits_from_bool")] #[serde(default = "count_hits_from_bool::default")] pub count_all: CountHits, + #[param(value_type = bool)] + #[schema(value_type = bool)] + #[serde(default)] + pub allow_failed_splits: bool, } mod count_hits_from_bool { @@ -302,8 +306,22 @@ async fn search_endpoint( search_request: SearchRequestQueryString, search_service: &dyn SearchService, ) -> Result { + let allow_failed_splits = search_request.allow_failed_splits; let search_request = search_request_from_api_request(index_id_patterns, search_request)?; - let search_response = search_service.root_search(search_request).await?; + let search_response = + search_service + .root_search(search_request) + .await + .and_then(|search_response| { + if !allow_failed_splits || search_response.num_successful_splits == 0 { + if let Some(search_error) = + SearchError::from_split_errors(&search_response.failed_splits[..]) + { + return Err(search_error); + } + } + Ok(search_response) + })?; let search_response_rest = SearchResponseRest::try_from(search_response)?; Ok(search_response_rest) } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml index 58e425f07e5..0f6b845b7a0 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml @@ -1,3 +1,17 @@ +--- +engines: ["quickwit"] +params: + size: 1 + scroll: 30m + allow_partial_search_results: "false" +json: + query: + match_all: {} +status_code: 400 +expected: + error: + reason: "Invalid argument: Quickwit only supports scroll API with allow_partial_search_results set to true" +--- params: size: 1 scroll: 30m