From fc331b6e427212f319cf0a34a13809da52c54ee6 Mon Sep 17 00:00:00 2001 From: Alexander Lyon Date: Sat, 7 Dec 2024 12:38:33 +0000 Subject: [PATCH] make jaeger span attribute-to-tag conversion exhaustive --- quickwit/quickwit-jaeger/src/lib.rs | 131 +++++++++++++++++----------- 1 file changed, 82 insertions(+), 49 deletions(-) diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 815f5ef7015..bf9729ac50c 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; -use itertools::Itertools; +use itertools::{Either, Itertools}; use prost::Message; use prost_types::{Duration as WellKnownDuration, Timestamp as WellKnownTimestamp}; use quickwit_config::JaegerConfig; @@ -772,7 +772,7 @@ fn qw_span_to_jaeger_span(qw_span_json: &str) -> Result { qw_span.resource_attributes.remove("service.name"); let process = Some(JaegerProcess { service_name: qw_span.service_name, - tags: otlp_attributes_to_jaeger_tags(qw_span.resource_attributes)?, + tags: otlp_attributes_to_jaeger_tags(qw_span.resource_attributes), }); let logs: Vec = qw_span .events @@ -780,7 +780,7 @@ fn qw_span_to_jaeger_span(qw_span_json: &str) -> Result { .map(qw_event_to_jaeger_log) .collect::>()?; - let mut tags = otlp_attributes_to_jaeger_tags(qw_span.span_attributes)?; + let mut tags = otlp_attributes_to_jaeger_tags(qw_span.span_attributes); inject_dropped_count_tags( &mut tags, qw_span.span_dropped_attributes_count, @@ -944,55 +944,88 @@ fn inject_span_status_tags(tags: &mut Vec, span_status: QwSpanSt }; } -/// Converts OpenTelemetry attributes to Jaeger tags. +/// Converts OpenTelemetry attributes to Jaeger tags. Objects are flattened with +/// their keys prefixed with the parent keys delimited by a dot. +/// /// fn otlp_attributes_to_jaeger_tags( - attributes: HashMap, -) -> Result, Status> { - let mut tags = Vec::with_capacity(attributes.len()); - for (key, value) in attributes { - let mut tag = JaegerKeyValue { - key, - v_type: ValueType::String as i32, - v_str: String::new(), - v_bool: false, - v_int64: 0, - v_float64: 0.0, - v_binary: Vec::new(), - }; - match value { - // Array values MUST be serialized to string like a JSON list. - JsonValue::Array(values) => { - tag.v_type = ValueType::String as i32; - tag.v_str = serde_json::to_string(&values) - .expect("A vec of `serde_json::Value` values should be JSON serializable."); - } - JsonValue::Bool(value) => { - tag.v_type = ValueType::Bool as i32; - tag.v_bool = value; - } - JsonValue::Number(number) => { - if let Some(value) = number.as_i64() { - tag.v_type = ValueType::Int64 as i32; - tag.v_int64 = value; - } else if let Some(value) = number.as_f64() { - tag.v_type = ValueType::Float64 as i32; - tag.v_float64 = value + attributes: impl IntoIterator, +) -> Vec { + otlp_attributes_to_jaeger_tags_inner(attributes, None) +} + +/// Inner helper for `otpl_attributes_to_jaeger_tags` recursive call +/// +/// PERF: as long as `attributes` IntoIterator implementation correctly sets the +/// lower bound then collect should allocate efficiently. Note that the flat map +/// may cause more allocations as we cannot predict the number of elements in the +/// iterator. +fn otlp_attributes_to_jaeger_tags_inner( + attributes: impl IntoIterator, + parent_key: Option<&str>, +) -> Vec { + attributes + .into_iter() + .map(|(key, value)| { + let key = parent_key.map(|k| format!("{k}.{key}")).unwrap_or(key); + match value { + JsonValue::Array(values) => { + Either::Left(Some(JaegerKeyValue { + key, + v_type: ValueType::String as i32, + // Array values MUST be serialized to string like a JSON list. + v_str: serde_json::to_string(&values).expect( + "A vec of `serde_json::Value` values should be JSON serializable.", + ), + ..Default::default() + })) + } + JsonValue::Bool(v_bool) => Either::Left(Some(JaegerKeyValue { + key, + v_type: ValueType::Bool as i32, + v_bool, + ..Default::default() + })), + JsonValue::Number(number) => { + let value = if let Some(v_int64) = number.as_i64() { + Some(JaegerKeyValue { + key, + v_type: ValueType::Int64 as i32, + v_int64, + ..Default::default() + }) + } else if let Some(v_float64) = number.as_f64() { + Some(JaegerKeyValue { + key, + v_type: ValueType::Float64 as i32, + v_float64, + ..Default::default() + }) + } else { + // Print some error rather than silently ignoring the value. + warn!("ignoring unrepresentable number value: {number:?}"); + None + }; + + Either::Left(value) + } + JsonValue::String(v_str) => Either::Left(Some(JaegerKeyValue { + key, + v_type: ValueType::String as i32, + v_str, + ..Default::default() + })), + JsonValue::Null => { + // No use including null values in the tags, so ignore + Either::Left(None) + } + JsonValue::Object(value) => { + Either::Right(otlp_attributes_to_jaeger_tags_inner(value, Some(&key))) } } - JsonValue::String(value) => { - tag.v_type = ValueType::String as i32; - tag.v_str = value - } - _ => { - return Err(Status::internal(format!( - "Failed to serialize attributes: unexpected type `{value:?}`" - ))) - } - }; - tags.push(tag); - } - Ok(tags) + }) + .flat_map(|e| e.into_iter()) + .collect() } /// Converts OpenTelemetry links to Jaeger span references. @@ -1036,7 +1069,7 @@ fn qw_event_to_jaeger_log(event: QwEvent) -> Result { let insert_event_name = !event.event_name.is_empty() && !event.event_attributes.contains_key("event"); - let mut fields = otlp_attributes_to_jaeger_tags(event.event_attributes)?; + let mut fields = otlp_attributes_to_jaeger_tags(event.event_attributes); if insert_event_name { fields.push(JaegerKeyValue {