Skip to content

Commit

Permalink
make jaeger span attribute-to-tag conversion exhaustive
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Dec 7, 2024
1 parent 5329379 commit fc331b6
Showing 1 changed file with 82 additions and 49 deletions.
131 changes: 82 additions & 49 deletions quickwit/quickwit-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use itertools::Itertools;
use itertools::{Either, Itertools};
use prost::Message;
use prost_types::{Duration as WellKnownDuration, Timestamp as WellKnownTimestamp};
use quickwit_config::JaegerConfig;
Expand Down Expand Up @@ -772,15 +772,15 @@ fn qw_span_to_jaeger_span(qw_span_json: &str) -> Result<JaegerSpan, Status> {
qw_span.resource_attributes.remove("service.name");
let process = Some(JaegerProcess {
service_name: qw_span.service_name,
tags: otlp_attributes_to_jaeger_tags(qw_span.resource_attributes)?,
tags: otlp_attributes_to_jaeger_tags(qw_span.resource_attributes),
});
let logs: Vec<JaegerLog> = qw_span
.events
.into_iter()
.map(qw_event_to_jaeger_log)
.collect::<Result<_, _>>()?;

let mut tags = otlp_attributes_to_jaeger_tags(qw_span.span_attributes)?;
let mut tags = otlp_attributes_to_jaeger_tags(qw_span.span_attributes);
inject_dropped_count_tags(
&mut tags,
qw_span.span_dropped_attributes_count,
Expand Down Expand Up @@ -944,55 +944,88 @@ fn inject_span_status_tags(tags: &mut Vec<JaegerKeyValue>, span_status: QwSpanSt
};
}

/// Converts OpenTelemetry attributes to Jaeger tags.
/// Converts OpenTelemetry attributes to Jaeger tags. Objects are flattened with
/// their keys prefixed with the parent keys delimited by a dot.
///
/// <https://opentelemetry.io/docs/specs/otel/trace/sdk_exporters/jaeger/#attributes>
fn otlp_attributes_to_jaeger_tags(
attributes: HashMap<String, JsonValue>,
) -> Result<Vec<JaegerKeyValue>, Status> {
let mut tags = Vec::with_capacity(attributes.len());
for (key, value) in attributes {
let mut tag = JaegerKeyValue {
key,
v_type: ValueType::String as i32,
v_str: String::new(),
v_bool: false,
v_int64: 0,
v_float64: 0.0,
v_binary: Vec::new(),
};
match value {
// Array values MUST be serialized to string like a JSON list.
JsonValue::Array(values) => {
tag.v_type = ValueType::String as i32;
tag.v_str = serde_json::to_string(&values)
.expect("A vec of `serde_json::Value` values should be JSON serializable.");
}
JsonValue::Bool(value) => {
tag.v_type = ValueType::Bool as i32;
tag.v_bool = value;
}
JsonValue::Number(number) => {
if let Some(value) = number.as_i64() {
tag.v_type = ValueType::Int64 as i32;
tag.v_int64 = value;
} else if let Some(value) = number.as_f64() {
tag.v_type = ValueType::Float64 as i32;
tag.v_float64 = value
attributes: impl IntoIterator<Item = (String, JsonValue)>,
) -> Vec<JaegerKeyValue> {
otlp_attributes_to_jaeger_tags_inner(attributes, None)
}

/// Inner helper for `otpl_attributes_to_jaeger_tags` recursive call
///
/// PERF: as long as `attributes` IntoIterator implementation correctly sets the
/// lower bound then collect should allocate efficiently. Note that the flat map
/// may cause more allocations as we cannot predict the number of elements in the
/// iterator.
fn otlp_attributes_to_jaeger_tags_inner(
attributes: impl IntoIterator<Item = (String, JsonValue)>,
parent_key: Option<&str>,
) -> Vec<JaegerKeyValue> {
attributes
.into_iter()
.map(|(key, value)| {
let key = parent_key.map(|k| format!("{k}.{key}")).unwrap_or(key);
match value {
JsonValue::Array(values) => {
Either::Left(Some(JaegerKeyValue {
key,
v_type: ValueType::String as i32,
// Array values MUST be serialized to string like a JSON list.
v_str: serde_json::to_string(&values).expect(
"A vec of `serde_json::Value` values should be JSON serializable.",
),
..Default::default()
}))
}
JsonValue::Bool(v_bool) => Either::Left(Some(JaegerKeyValue {
key,
v_type: ValueType::Bool as i32,
v_bool,
..Default::default()
})),
JsonValue::Number(number) => {
let value = if let Some(v_int64) = number.as_i64() {
Some(JaegerKeyValue {
key,
v_type: ValueType::Int64 as i32,
v_int64,
..Default::default()
})
} else if let Some(v_float64) = number.as_f64() {
Some(JaegerKeyValue {
key,
v_type: ValueType::Float64 as i32,
v_float64,
..Default::default()
})
} else {
// Print some error rather than silently ignoring the value.
warn!("ignoring unrepresentable number value: {number:?}");
None
};

Either::Left(value)
}
JsonValue::String(v_str) => Either::Left(Some(JaegerKeyValue {
key,
v_type: ValueType::String as i32,
v_str,
..Default::default()
})),
JsonValue::Null => {
// No use including null values in the tags, so ignore
Either::Left(None)
}
JsonValue::Object(value) => {
Either::Right(otlp_attributes_to_jaeger_tags_inner(value, Some(&key)))
}
}
JsonValue::String(value) => {
tag.v_type = ValueType::String as i32;
tag.v_str = value
}
_ => {
return Err(Status::internal(format!(
"Failed to serialize attributes: unexpected type `{value:?}`"
)))
}
};
tags.push(tag);
}
Ok(tags)
})
.flat_map(|e| e.into_iter())
.collect()
}

/// Converts OpenTelemetry links to Jaeger span references.
Expand Down Expand Up @@ -1036,7 +1069,7 @@ fn qw_event_to_jaeger_log(event: QwEvent) -> Result<JaegerLog, Status> {
let insert_event_name =
!event.event_name.is_empty() && !event.event_attributes.contains_key("event");

let mut fields = otlp_attributes_to_jaeger_tags(event.event_attributes)?;
let mut fields = otlp_attributes_to_jaeger_tags(event.event_attributes);

if insert_event_name {
fields.push(JaegerKeyValue {
Expand Down

0 comments on commit fc331b6

Please sign in to comment.