Skip to content

Commit

Permalink
switch from json to ciborium
Browse files Browse the repository at this point in the history
switch to ciborium for better datastructure support when sending intermediate results between nodes
  • Loading branch information
PSeitz committed Apr 24, 2023
1 parent ea6d4fc commit 93586b2
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 82 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ backoff = { version = "0.4", features = ["tokio"] }
base64 = "0.21"
byte-unit = { version = "4", default-features = false, features = ["serde"] }
bytes = "1"
ciborium = "0.2.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat", rev = "4973853" }
chrono = "0.4.23"
clap = { version = "=3.1", features = ["env"] }
Expand Down
35 changes: 17 additions & 18 deletions quickwit/quickwit-opentelemetry/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpVa
use quickwit_proto::opentelemetry::proto::common::v1::{
AnyValue as OtlpAnyValue, KeyValue as OtlpKeyValue,
};
use serde;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use serde::{self, de, Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{Number as JsonNumber, Value as JsonValue};
use tracing::warn;

Expand All @@ -45,6 +44,22 @@ pub use trace::{
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct TraceId([u8; 16]);

impl Serialize for TraceId {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let b64trace_id = BASE64_STANDARD.encode(self.0);
serializer.serialize_str(&b64trace_id)
}
}

impl<'de> Deserialize<'de> for TraceId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
String::deserialize(deserializer)?
.parse()
.map_err(de::Error::custom)
}
}

impl TraceId {
pub const BASE64_LENGTH: usize = 24;

Expand All @@ -65,22 +80,6 @@ impl TraceId {
}
}

impl Serialize for TraceId {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let b64trace_id = BASE64_STANDARD.encode(self.0);
serializer.serialize_str(&b64trace_id)
}
}

impl<'de> Deserialize<'de> for TraceId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
String::deserialize(deserializer)?
.parse()
.map_err(de::Error::custom)
}
}

#[derive(Debug, thiserror::Error)]
pub enum TryFromTraceIdError {
#[error("Trace ID must be 16 bytes long, got {0}.")]
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-proto/protos/quickwit/search_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ message LeafSearchResponse {
// num_attempted_splits = num_successful_splits + num_failed_splits.
uint64 num_attempted_splits = 4;

// json serialized intermediate aggregation_result.
optional string intermediate_aggregation_result = 5;
// ciborium serialized intermediate aggregation_result.
optional bytes intermediate_aggregation_result = 5;

}

Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-proto/src/quickwit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ pub struct LeafSearchResponse {
/// num_attempted_splits = num_successful_splits + num_failed_splits.
#[prost(uint64, tag = "4")]
pub num_attempted_splits: u64,
/// json serialized intermediate aggregation_result.
#[prost(string, optional, tag = "5")]
/// ciborium serialized intermediate aggregation_result.
#[prost(bytes = "vec", optional, tag = "5")]
pub intermediate_aggregation_result: ::core::option::Option<
::prost::alloc::string::String,
::prost::alloc::vec::Vec<u8>,
>,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ documentation = "https://quickwit.io/docs/"
anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
ciborium = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
Expand Down
10 changes: 6 additions & 4 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,13 @@ fn merge_leaf_search_results(
if let Some(res2_str) = retry_response.intermediate_aggregation_result.as_ref()
{
let mut res1: IntermediateAggregationResults =
serde_json::from_str(&res1_str)?;
let res2: IntermediateAggregationResults = serde_json::from_str(res2_str)?;
ciborium::de::from_reader(&mut res1_str.as_slice())?;
let res2: IntermediateAggregationResults =
ciborium::de::from_reader(&mut res2_str.as_slice())?;
res1.merge_fruits(res2)?;
let res = serde_json::to_string(&res1)?;
Ok(res)
let mut serialized = Vec::new();
ciborium::ser::into_writer(&res1, &mut serialized)?;
Ok(serialized)
} else {
Ok(res1_str)
}
Expand Down
53 changes: 38 additions & 15 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tantivy::aggregation::{AggregationLimits, AggregationSegmentCollector};
use tantivy::collector::{Collector, SegmentCollector};
use tantivy::columnar::ColumnType;
use tantivy::fastfield::Column;
use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader};
use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};

use crate::filters::{create_timestamp_filter_builder, TimestampFilter, TimestampFilterBuilder};
use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector};
Expand Down Expand Up @@ -267,15 +267,18 @@ impl SegmentCollector for QuickwitSegmentCollector {
.collect();

let intermediate_aggregation_result = match self.aggregation {
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => Some(
serde_json::to_string(&collector.harvest())
.expect("Collector fruit should be JSON serializable."),
),
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => {
let fruit = collector.harvest();
let mut serialized = Vec::new();
ciborium::ser::into_writer(&fruit, &mut serialized)
.expect("Collector fruit should be serializable.");
Some(serialized)
}
Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => {
Some(
serde_json::to_string(&collector.harvest()?)
.expect("Collector fruit should be JSON serializable."),
)
let mut serialized = Vec::new();
ciborium::ser::into_writer(&collector.harvest()?, &mut serialized)
.expect("Collector fruit should be serializable.");
Some(serialized)
}
None => None,
};
Expand Down Expand Up @@ -438,15 +441,23 @@ impl Collector for QuickwitCollector {
}
}

fn map_de_error<T: std::fmt::Debug>(err: ciborium::de::Error<T>) -> TantivyError {
TantivyError::InternalError(format!("Merge Result Deserialization Error: {}", err))
}

fn map_ser_error<T: std::fmt::Debug>(err: ciborium::ser::Error<T>) -> TantivyError {
TantivyError::InternalError(format!("Merge Result Serialization Error: {}", err))
}

/// Merges a set of Leaf Results.
fn merge_leaf_responses(
aggregations_opt: &Option<QuickwitAggregations>,
leaf_responses: Vec<LeafSearchResponse>,
mut leaf_responses: Vec<LeafSearchResponse>,
max_hits: usize,
) -> tantivy::Result<LeafSearchResponse> {
// Optimization: No merging needed if there is only one result.
if leaf_responses.len() == 1 {
return Ok(leaf_responses.into_iter().next().unwrap_or_default()); //< default is actually never called
return Ok(leaf_responses.pop().unwrap());
}
let merged_intermediate_aggregation_result = match aggregations_opt {
Some(QuickwitAggregations::FindTraceIdsAggregation(collector)) => {
Expand All @@ -457,21 +468,29 @@ fn merge_leaf_responses(
.filter_map(|leaf_response| {
leaf_response.intermediate_aggregation_result.as_ref().map(
|intermediate_aggregation_result| {
serde_json::from_str(intermediate_aggregation_result)
ciborium::de::from_reader(
&mut intermediate_aggregation_result.as_slice(),
)
.map_err(map_de_error)
},
)
})
.collect::<Result<_, _>>()?;
let merged_fruit = collector.merge_fruits(fruits)?;
Some(serde_json::to_string(&merged_fruit)?)
let mut serialized = Vec::new();
ciborium::ser::into_writer(&merged_fruit, &mut serialized).map_err(map_ser_error)?;
Some(serialized)
}
Some(QuickwitAggregations::TantivyAggregations(_)) => {
let fruits: Vec<IntermediateAggregationResults> = leaf_responses
.iter()
.filter_map(|leaf_response| {
leaf_response.intermediate_aggregation_result.as_ref().map(
|intermediate_aggregation_result| {
serde_json::from_str(intermediate_aggregation_result)
ciborium::de::from_reader(
&mut intermediate_aggregation_result.as_slice(),
)
.map_err(map_de_error)
},
)
})
Expand All @@ -483,7 +502,11 @@ fn merge_leaf_responses(
for fruit in fruit_iter {
merged_fruit.merge_fruits(fruit)?;
}
Some(serde_json::to_string(&merged_fruit)?)
let mut serialized = Vec::new();
ciborium::ser::into_writer(&merged_fruit, &mut serialized)
.map_err(map_ser_error)?;

Some(serialized)
} else {
None
}
Expand Down
12 changes: 12 additions & 0 deletions quickwit/quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ impl From<TantivyError> for SearchError {
}
}

impl<T: std::fmt::Debug> From<ciborium::de::Error<T>> for SearchError {
fn from(error: ciborium::de::Error<T>) -> Self {
SearchError::InternalError(format!("Deserialize error: {error}"))
}
}

impl<T: std::fmt::Debug> From<ciborium::ser::Error<T>> for SearchError {
fn from(error: ciborium::ser::Error<T>) -> Self {
SearchError::InternalError(format!("Serialize error: {error}"))
}
}

impl From<serde_json::Error> for SearchError {
fn from(serde_error: serde_json::Error) -> Self {
SearchError::InternalError(format!("Serde error: {serde_error}"))
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ async fn leaf_search_single_split(
.map_err(|_| {
crate::SearchError::InternalError(format!("Leaf search panicked. split={split_id}"))
})??;

Ok(leaf_search_response)
}

Expand Down
41 changes: 13 additions & 28 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ mod tests;
pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
use quickwit_doc_mapper::DocMapper;
use root::validate_request;
use root::{finalize_aggregation, validate_request};
use service::SearcherContext;
use tantivy::aggregation::AggregationLimits;
use tantivy::query::Query as TantivyQuery;
use tantivy::schema::NamedFieldDocument;

Expand All @@ -66,8 +65,6 @@ use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_metastore::{ListSplitsQuery, Metastore, SplitMetadata, SplitState};
use quickwit_proto::{Hit, PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets};
use quickwit_storage::StorageUriResolver;
use tantivy::aggregation::agg_result::AggregationResults;
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use tantivy::DocAddress;

pub use crate::client::{create_search_service_client, SearchServiceClient};
Expand Down Expand Up @@ -193,6 +190,7 @@ pub async fn single_node_search(
// Validates the query by effectively building it against the current schema.
doc_mapper.query(doc_mapper.schema(), search_request)?;
let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default()));

let leaf_search_response = leaf_search(
searcher_context.clone(),
search_request,
Expand Down Expand Up @@ -229,30 +227,17 @@ pub async fn single_node_search(
})
.collect();
let elapsed = start_instant.elapsed();
let aggregation = if let Some(intermediate_aggregation_result) =
leaf_search_response.intermediate_aggregation_result
{
let aggregations: QuickwitAggregations =
serde_json::from_str(search_request.aggregation_request.as_ref().expect(
"Aggregation should be present since we are processing an intermediate \
aggregation result.",
))?;
match aggregations {
QuickwitAggregations::FindTraceIdsAggregation(_) => {
// There is nothing to merge here because there is only one leaf response.
Some(intermediate_aggregation_result)
}
QuickwitAggregations::TantivyAggregations(aggregations) => {
let res: IntermediateAggregationResults =
serde_json::from_str(&intermediate_aggregation_result)?;
let res: AggregationResults =
res.into_final_result(aggregations, &AggregationLimits::default())?;
Some(serde_json::to_string(&res)?)
}
}
} else {
None
};

let aggregations: Option<QuickwitAggregations> = search_request
.aggregation_request
.as_ref()
.map(|agg| serde_json::from_str(agg))
.transpose()?;

let aggregation = finalize_aggregation(
leaf_search_response.intermediate_aggregation_result,
aggregations,
)?;
Ok(SearchResponse {
aggregation,
num_hits: leaf_search_response.num_hits,
Expand Down
39 changes: 27 additions & 12 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tracing::{debug, error, info_span, instrument};

use crate::cluster_client::ClusterClient;
use crate::collector::{make_merge_collector, QuickwitAggregations};
use crate::find_trace_ids_collector::Span;
use crate::search_job_placer::Job;
use crate::service::SearcherContext;
use crate::{
Expand Down Expand Up @@ -298,20 +299,41 @@ pub async fn root_search(

let elapsed = start_instant.elapsed();

let aggregation = if let Some(intermediate_aggregation_result) =
leaf_search_response.intermediate_aggregation_result
let aggregation = finalize_aggregation(
leaf_search_response.intermediate_aggregation_result,
aggregations,
)?;

Ok(SearchResponse {
aggregation,
num_hits: leaf_search_response.num_hits,
hits,
elapsed_time_micros: elapsed.as_micros() as u64,
errors: Vec::new(),
})
}

pub fn finalize_aggregation(
intermediate_aggregation_result: Option<Vec<u8>>,
aggregations: Option<QuickwitAggregations>,
) -> crate::Result<Option<String>> {
let aggregation = if let Some(intermediate_aggregation_result) = intermediate_aggregation_result
{
match aggregations.expect(
"Aggregation should be present since we are processing an intermediate aggregation \
result.",
) {
QuickwitAggregations::FindTraceIdsAggregation(_) => {
// The merge collector has already merged the intermediate results.
Some(intermediate_aggregation_result)
let aggs: Vec<Span> =
ciborium::de::from_reader(&mut intermediate_aggregation_result.as_slice())
.unwrap();
Some(serde_json::to_string(&aggs)?)
}
QuickwitAggregations::TantivyAggregations(aggregations) => {
let res: IntermediateAggregationResults =
serde_json::from_str(&intermediate_aggregation_result)?;
ciborium::de::from_reader(&mut intermediate_aggregation_result.as_slice())
.unwrap();
let res: AggregationResults =
res.into_final_result(aggregations, &AggregationLimits::default())?;
Some(serde_json::to_string(&res)?)
Expand All @@ -320,14 +342,7 @@ pub async fn root_search(
} else {
None
};

Ok(SearchResponse {
aggregation,
num_hits: leaf_search_response.num_hits,
hits,
elapsed_time_micros: elapsed.as_micros() as u64,
errors: Vec::new(),
})
Ok(aggregation)
}

/// Performs a distributed list terms.
Expand Down

0 comments on commit 93586b2

Please sign in to comment.