From 93586b24d7407e4519930c5be30918b449aa6580 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 12 Apr 2023 21:33:49 +0800 Subject: [PATCH] switch from json to ciborium switch to ciborium for better datastructure support when sending intermediate results between nodes --- quickwit/Cargo.lock | 1 + quickwit/Cargo.toml | 1 + .../quickwit-opentelemetry/src/otlp/mod.rs | 35 ++++++------ .../protos/quickwit/search_api.proto | 4 +- quickwit/quickwit-proto/src/quickwit.rs | 6 +-- quickwit/quickwit-search/Cargo.toml | 1 + .../quickwit-search/src/cluster_client.rs | 10 ++-- quickwit/quickwit-search/src/collector.rs | 53 +++++++++++++------ quickwit/quickwit-search/src/error.rs | 12 +++++ quickwit/quickwit-search/src/leaf.rs | 1 + quickwit/quickwit-search/src/lib.rs | 41 +++++--------- quickwit/quickwit-search/src/root.rs | 39 +++++++++----- 12 files changed, 122 insertions(+), 82 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 8c180909c46..0ee2be4391d 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4846,6 +4846,7 @@ dependencies = [ "async-trait", "bytes", "chitchat", + "ciborium", "fnv", "futures", "http", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 9228c0f4629..6aae49e2242 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -50,6 +50,7 @@ backoff = { version = "0.4", features = ["tokio"] } base64 = "0.21" byte-unit = { version = "4", default-features = false, features = ["serde"] } bytes = "1" +ciborium = "0.2.0" chitchat = { git = "https://github.com/quickwit-oss/chitchat", rev = "4973853" } chrono = "0.4.23" clap = { version = "=3.1", features = ["env"] } diff --git a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs index 0edd8598cdc..eb849020ff4 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs @@ -27,8 +27,7 @@ use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpVa use quickwit_proto::opentelemetry::proto::common::v1::{ AnyValue as OtlpAnyValue, KeyValue as OtlpKeyValue, }; -use serde; -use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +use serde::{self, de, Deserialize, Deserializer, Serialize, Serializer}; use serde_json::{Number as JsonNumber, Value as JsonValue}; use tracing::warn; @@ -45,6 +44,22 @@ pub use trace::{ #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct TraceId([u8; 16]); +impl Serialize for TraceId { + fn serialize(&self, serializer: S) -> Result { + let b64trace_id = BASE64_STANDARD.encode(self.0); + serializer.serialize_str(&b64trace_id) + } +} + +impl<'de> Deserialize<'de> for TraceId { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'de> { + String::deserialize(deserializer)? + .parse() + .map_err(de::Error::custom) + } +} + impl TraceId { pub const BASE64_LENGTH: usize = 24; @@ -65,22 +80,6 @@ impl TraceId { } } -impl Serialize for TraceId { - fn serialize(&self, serializer: S) -> Result { - let b64trace_id = BASE64_STANDARD.encode(self.0); - serializer.serialize_str(&b64trace_id) - } -} - -impl<'de> Deserialize<'de> for TraceId { - fn deserialize(deserializer: D) -> Result - where D: Deserializer<'de> { - String::deserialize(deserializer)? - .parse() - .map_err(de::Error::custom) - } -} - #[derive(Debug, thiserror::Error)] pub enum TryFromTraceIdError { #[error("Trace ID must be 16 bytes long, got {0}.")] diff --git a/quickwit/quickwit-proto/protos/quickwit/search_api.proto b/quickwit/quickwit-proto/protos/quickwit/search_api.proto index b416d18fff1..89f14e55e4b 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search_api.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search_api.proto @@ -240,8 +240,8 @@ message LeafSearchResponse { // num_attempted_splits = num_successful_splits + num_failed_splits. uint64 num_attempted_splits = 4; - // json serialized intermediate aggregation_result. - optional string intermediate_aggregation_result = 5; + // ciborium serialized intermediate aggregation_result. + optional bytes intermediate_aggregation_result = 5; } diff --git a/quickwit/quickwit-proto/src/quickwit.rs b/quickwit/quickwit-proto/src/quickwit.rs index 0286d1235a5..b171712167a 100644 --- a/quickwit/quickwit-proto/src/quickwit.rs +++ b/quickwit/quickwit-proto/src/quickwit.rs @@ -199,10 +199,10 @@ pub struct LeafSearchResponse { /// num_attempted_splits = num_successful_splits + num_failed_splits. #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, - /// json serialized intermediate aggregation_result. - #[prost(string, optional, tag = "5")] + /// ciborium serialized intermediate aggregation_result. + #[prost(bytes = "vec", optional, tag = "5")] pub intermediate_aggregation_result: ::core::option::Option< - ::prost::alloc::string::String, + ::prost::alloc::vec::Vec, >, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index 6868df98c19..54b31913b35 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -13,6 +13,7 @@ documentation = "https://quickwit.io/docs/" anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +ciborium = { workspace = true } fnv = { workspace = true } futures = { workspace = true } http = { workspace = true } diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index e82d2da72f7..47ede50b873 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -172,11 +172,13 @@ fn merge_leaf_search_results( if let Some(res2_str) = retry_response.intermediate_aggregation_result.as_ref() { let mut res1: IntermediateAggregationResults = - serde_json::from_str(&res1_str)?; - let res2: IntermediateAggregationResults = serde_json::from_str(res2_str)?; + ciborium::de::from_reader(&mut res1_str.as_slice())?; + let res2: IntermediateAggregationResults = + ciborium::de::from_reader(&mut res2_str.as_slice())?; res1.merge_fruits(res2)?; - let res = serde_json::to_string(&res1)?; - Ok(res) + let mut serialized = Vec::new(); + ciborium::ser::into_writer(&res1, &mut serialized)?; + Ok(serialized) } else { Ok(res1_str) } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 15cce0e24a5..91c69163c28 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -31,7 +31,7 @@ use tantivy::aggregation::{AggregationLimits, AggregationSegmentCollector}; use tantivy::collector::{Collector, SegmentCollector}; use tantivy::columnar::ColumnType; use tantivy::fastfield::Column; -use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader}; +use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError}; use crate::filters::{create_timestamp_filter_builder, TimestampFilter, TimestampFilterBuilder}; use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector}; @@ -267,15 +267,18 @@ impl SegmentCollector for QuickwitSegmentCollector { .collect(); let intermediate_aggregation_result = match self.aggregation { - Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => Some( - serde_json::to_string(&collector.harvest()) - .expect("Collector fruit should be JSON serializable."), - ), + Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { + let fruit = collector.harvest(); + let mut serialized = Vec::new(); + ciborium::ser::into_writer(&fruit, &mut serialized) + .expect("Collector fruit should be serializable."); + Some(serialized) + } Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => { - Some( - serde_json::to_string(&collector.harvest()?) - .expect("Collector fruit should be JSON serializable."), - ) + let mut serialized = Vec::new(); + ciborium::ser::into_writer(&collector.harvest()?, &mut serialized) + .expect("Collector fruit should be serializable."); + Some(serialized) } None => None, }; @@ -438,15 +441,23 @@ impl Collector for QuickwitCollector { } } +fn map_de_error(err: ciborium::de::Error) -> TantivyError { + TantivyError::InternalError(format!("Merge Result Deserialization Error: {}", err)) +} + +fn map_ser_error(err: ciborium::ser::Error) -> TantivyError { + TantivyError::InternalError(format!("Merge Result Serialization Error: {}", err)) +} + /// Merges a set of Leaf Results. fn merge_leaf_responses( aggregations_opt: &Option, - leaf_responses: Vec, + mut leaf_responses: Vec, max_hits: usize, ) -> tantivy::Result { // Optimization: No merging needed if there is only one result. if leaf_responses.len() == 1 { - return Ok(leaf_responses.into_iter().next().unwrap_or_default()); //< default is actually never called + return Ok(leaf_responses.pop().unwrap()); } let merged_intermediate_aggregation_result = match aggregations_opt { Some(QuickwitAggregations::FindTraceIdsAggregation(collector)) => { @@ -457,13 +468,18 @@ fn merge_leaf_responses( .filter_map(|leaf_response| { leaf_response.intermediate_aggregation_result.as_ref().map( |intermediate_aggregation_result| { - serde_json::from_str(intermediate_aggregation_result) + ciborium::de::from_reader( + &mut intermediate_aggregation_result.as_slice(), + ) + .map_err(map_de_error) }, ) }) .collect::>()?; let merged_fruit = collector.merge_fruits(fruits)?; - Some(serde_json::to_string(&merged_fruit)?) + let mut serialized = Vec::new(); + ciborium::ser::into_writer(&merged_fruit, &mut serialized).map_err(map_ser_error)?; + Some(serialized) } Some(QuickwitAggregations::TantivyAggregations(_)) => { let fruits: Vec = leaf_responses @@ -471,7 +487,10 @@ fn merge_leaf_responses( .filter_map(|leaf_response| { leaf_response.intermediate_aggregation_result.as_ref().map( |intermediate_aggregation_result| { - serde_json::from_str(intermediate_aggregation_result) + ciborium::de::from_reader( + &mut intermediate_aggregation_result.as_slice(), + ) + .map_err(map_de_error) }, ) }) @@ -483,7 +502,11 @@ fn merge_leaf_responses( for fruit in fruit_iter { merged_fruit.merge_fruits(fruit)?; } - Some(serde_json::to_string(&merged_fruit)?) + let mut serialized = Vec::new(); + ciborium::ser::into_writer(&merged_fruit, &mut serialized) + .map_err(map_ser_error)?; + + Some(serialized) } else { None } diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 5d229850ffd..2bbd53a8a10 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -75,6 +75,18 @@ impl From for SearchError { } } +impl From> for SearchError { + fn from(error: ciborium::de::Error) -> Self { + SearchError::InternalError(format!("Deserialize error: {error}")) + } +} + +impl From> for SearchError { + fn from(error: ciborium::ser::Error) -> Self { + SearchError::InternalError(format!("Serialize error: {error}")) + } +} + impl From for SearchError { fn from(serde_error: serde_json::Error) -> Self { SearchError::InternalError(format!("Serde error: {serde_error}")) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index ee1ea1d3c6d..9ce075af2cc 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -347,6 +347,7 @@ async fn leaf_search_single_split( .map_err(|_| { crate::SearchError::InternalError(format!("Leaf search panicked. split={split_id}")) })??; + Ok(leaf_search_response) } diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 27000d165a8..b6980296c4b 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -46,9 +46,8 @@ mod tests; pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; use quickwit_doc_mapper::DocMapper; -use root::validate_request; +use root::{finalize_aggregation, validate_request}; use service::SearcherContext; -use tantivy::aggregation::AggregationLimits; use tantivy::query::Query as TantivyQuery; use tantivy::schema::NamedFieldDocument; @@ -66,8 +65,6 @@ use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_metastore::{ListSplitsQuery, Metastore, SplitMetadata, SplitState}; use quickwit_proto::{Hit, PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; use quickwit_storage::StorageUriResolver; -use tantivy::aggregation::agg_result::AggregationResults; -use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::DocAddress; pub use crate::client::{create_search_service_client, SearchServiceClient}; @@ -193,6 +190,7 @@ pub async fn single_node_search( // Validates the query by effectively building it against the current schema. doc_mapper.query(doc_mapper.schema(), search_request)?; let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); + let leaf_search_response = leaf_search( searcher_context.clone(), search_request, @@ -229,30 +227,17 @@ pub async fn single_node_search( }) .collect(); let elapsed = start_instant.elapsed(); - let aggregation = if let Some(intermediate_aggregation_result) = - leaf_search_response.intermediate_aggregation_result - { - let aggregations: QuickwitAggregations = - serde_json::from_str(search_request.aggregation_request.as_ref().expect( - "Aggregation should be present since we are processing an intermediate \ - aggregation result.", - ))?; - match aggregations { - QuickwitAggregations::FindTraceIdsAggregation(_) => { - // There is nothing to merge here because there is only one leaf response. - Some(intermediate_aggregation_result) - } - QuickwitAggregations::TantivyAggregations(aggregations) => { - let res: IntermediateAggregationResults = - serde_json::from_str(&intermediate_aggregation_result)?; - let res: AggregationResults = - res.into_final_result(aggregations, &AggregationLimits::default())?; - Some(serde_json::to_string(&res)?) - } - } - } else { - None - }; + + let aggregations: Option = search_request + .aggregation_request + .as_ref() + .map(|agg| serde_json::from_str(agg)) + .transpose()?; + + let aggregation = finalize_aggregation( + leaf_search_response.intermediate_aggregation_result, + aggregations, + )?; Ok(SearchResponse { aggregation, num_hits: leaf_search_response.num_hits, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 40ad039e6ef..dc922d18a30 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -40,6 +40,7 @@ use tracing::{debug, error, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; +use crate::find_trace_ids_collector::Span; use crate::search_job_placer::Job; use crate::service::SearcherContext; use crate::{ @@ -298,8 +299,25 @@ pub async fn root_search( let elapsed = start_instant.elapsed(); - let aggregation = if let Some(intermediate_aggregation_result) = - leaf_search_response.intermediate_aggregation_result + let aggregation = finalize_aggregation( + leaf_search_response.intermediate_aggregation_result, + aggregations, + )?; + + Ok(SearchResponse { + aggregation, + num_hits: leaf_search_response.num_hits, + hits, + elapsed_time_micros: elapsed.as_micros() as u64, + errors: Vec::new(), + }) +} + +pub fn finalize_aggregation( + intermediate_aggregation_result: Option>, + aggregations: Option, +) -> crate::Result> { + let aggregation = if let Some(intermediate_aggregation_result) = intermediate_aggregation_result { match aggregations.expect( "Aggregation should be present since we are processing an intermediate aggregation \ @@ -307,11 +325,15 @@ pub async fn root_search( ) { QuickwitAggregations::FindTraceIdsAggregation(_) => { // The merge collector has already merged the intermediate results. - Some(intermediate_aggregation_result) + let aggs: Vec = + ciborium::de::from_reader(&mut intermediate_aggregation_result.as_slice()) + .unwrap(); + Some(serde_json::to_string(&aggs)?) } QuickwitAggregations::TantivyAggregations(aggregations) => { let res: IntermediateAggregationResults = - serde_json::from_str(&intermediate_aggregation_result)?; + ciborium::de::from_reader(&mut intermediate_aggregation_result.as_slice()) + .unwrap(); let res: AggregationResults = res.into_final_result(aggregations, &AggregationLimits::default())?; Some(serde_json::to_string(&res)?) @@ -320,14 +342,7 @@ pub async fn root_search( } else { None }; - - Ok(SearchResponse { - aggregation, - num_hits: leaf_search_response.num_hits, - hits, - elapsed_time_micros: elapsed.as_micros() as u64, - errors: Vec::new(), - }) + Ok(aggregation) } /// Performs a distributed list terms.