forked from linkerd/linkerd2-proxy
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Integrate OpenTelemetry into the proxy
OpenCensus is a deprecated protocol and is slated to be removed from upstream collectors soon. This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter, and we can switch the default later. [#10111](linkerd/linkerd2#10111) Signed-off-by: Scott Fleener <[email protected]>
- Loading branch information
Showing
20 changed files
with
639 additions
and
268 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,139 +1,88 @@ | ||
use crate::http_tracing::opencensus::{OcSpanConverter, OpenCensusSink}; | ||
use crate::http_tracing::opentelemetry::{OpenTelemetrySink, OtelSpanConverter}; | ||
use linkerd_error::Error; | ||
use linkerd_opencensus::proto::trace::v1 as oc; | ||
use linkerd_stack::layer; | ||
use linkerd_trace_context::{self as trace_context, TraceContext}; | ||
use linkerd_trace_context as trace_context; | ||
use linkerd_trace_context::{Span, TraceContext}; | ||
use std::{collections::HashMap, sync::Arc}; | ||
use thiserror::Error; | ||
use tokio::sync::mpsc; | ||
|
||
pub type OpenCensusSink = Option<mpsc::Sender<oc::Span>>; | ||
mod opencensus; | ||
mod opentelemetry; | ||
|
||
pub type Labels = Arc<HashMap<String, String>>; | ||
|
||
/// SpanConverter converts trace_context::Span objects into OpenCensus agent | ||
/// protobuf span objects. SpanConverter receives trace_context::Span objects by | ||
/// implmenting the SpanSink trait. For each span that it receives, it converts | ||
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. | ||
#[derive(Clone)] | ||
pub struct SpanConverter { | ||
kind: Kind, | ||
sink: mpsc::Sender<oc::Span>, | ||
labels: Labels, | ||
#[derive(Clone, Debug)] | ||
pub enum SpanSink { | ||
Oc(OpenCensusSink), | ||
Otel(OpenTelemetrySink), | ||
} | ||
|
||
#[derive(Debug, Error)] | ||
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] | ||
pub struct IdLengthError { | ||
id: Vec<u8>, | ||
expected_size: usize, | ||
actual_size: usize, | ||
impl SpanSink { | ||
fn into_converter(self, kind: Kind, labels: impl Into<Labels>) -> SpanConverter { | ||
match self { | ||
SpanSink::Oc(oc) => SpanConverter::Oc(OcSpanConverter::from_sink(oc, kind, labels)), | ||
SpanSink::Otel(otel) => { | ||
SpanConverter::Otel(OtelSpanConverter::from_sink(otel, kind, labels)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub fn server<S>( | ||
sink: OpenCensusSink, | ||
sink: Option<SpanSink>, | ||
labels: impl Into<Labels>, | ||
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone { | ||
SpanConverter::layer(Kind::Server, sink, labels) | ||
TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Server, labels))) | ||
} | ||
|
||
pub fn client<S>( | ||
sink: OpenCensusSink, | ||
sink: Option<SpanSink>, | ||
labels: impl Into<Labels>, | ||
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone { | ||
SpanConverter::layer(Kind::Client, sink, labels) | ||
TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Client, labels))) | ||
} | ||
|
||
#[derive(Copy, Clone, Debug, PartialEq)] | ||
enum Kind { | ||
Server = 1, | ||
Client = 2, | ||
} | ||
|
||
impl SpanConverter { | ||
fn layer<S>( | ||
kind: Kind, | ||
sink: OpenCensusSink, | ||
labels: impl Into<Labels>, | ||
) -> impl layer::Layer<S, Service = TraceContext<Option<Self>, S>> + Clone { | ||
TraceContext::layer(sink.map(move |sink| Self { | ||
kind, | ||
sink, | ||
labels: labels.into(), | ||
})) | ||
} | ||
|
||
fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> { | ||
let mut attributes = HashMap::<String, oc::AttributeValue>::new(); | ||
for (k, v) in self.labels.iter() { | ||
attributes.insert( | ||
k.clone(), | ||
oc::AttributeValue { | ||
value: Some(oc::attribute_value::Value::StringValue(truncatable( | ||
v.clone(), | ||
))), | ||
}, | ||
); | ||
} | ||
for (k, v) in span.labels.drain() { | ||
attributes.insert( | ||
k.to_string(), | ||
oc::AttributeValue { | ||
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), | ||
}, | ||
); | ||
} | ||
Ok(oc::Span { | ||
trace_id: into_bytes(span.trace_id, 16)?, | ||
span_id: into_bytes(span.span_id, 8)?, | ||
tracestate: None, | ||
parent_span_id: into_bytes(span.parent_id, 8)?, | ||
name: Some(truncatable(span.span_name)), | ||
kind: self.kind as i32, | ||
start_time: Some(span.start.into()), | ||
end_time: Some(span.end.into()), | ||
attributes: Some(oc::span::Attributes { | ||
attribute_map: attributes, | ||
dropped_attributes_count: 0, | ||
}), | ||
stack_trace: None, | ||
time_events: None, | ||
links: None, | ||
status: None, // TODO: this is gRPC status; we must read response trailers to populate this | ||
resource: None, | ||
same_process_as_parent_span: Some(self.kind == Kind::Client), | ||
child_span_count: None, | ||
}) | ||
} | ||
#[derive(Clone)] | ||
pub enum SpanConverter { | ||
Oc(OcSpanConverter), | ||
Otel(OtelSpanConverter), | ||
} | ||
|
||
impl trace_context::SpanSink for SpanConverter { | ||
#[inline] | ||
fn is_enabled(&self) -> bool { | ||
true | ||
} | ||
|
||
fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { | ||
let span = self.mk_span(span)?; | ||
self.sink.try_send(span).map_err(Into::into) | ||
fn try_send(&mut self, span: Span) -> Result<(), Error> { | ||
match self { | ||
SpanConverter::Oc(oc) => oc.try_send(span), | ||
SpanConverter::Otel(otel) => otel.try_send(span), | ||
} | ||
} | ||
} | ||
|
||
fn into_bytes(id: trace_context::Id, size: usize) -> Result<Vec<u8>, IdLengthError> { | ||
let bytes: Vec<u8> = id.into(); | ||
if bytes.len() == size { | ||
Ok(bytes) | ||
} else { | ||
let actual_size = bytes.len(); | ||
Err(IdLengthError { | ||
id: bytes, | ||
expected_size: size, | ||
actual_size, | ||
}) | ||
} | ||
#[derive(Debug, Error)] | ||
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] | ||
pub struct IdLengthError { | ||
id: Vec<u8>, | ||
expected_size: usize, | ||
actual_size: usize, | ||
} | ||
|
||
fn truncatable(value: String) -> oc::TruncatableString { | ||
oc::TruncatableString { | ||
value, | ||
truncated_byte_count: 0, | ||
} | ||
#[derive(Copy, Clone, Debug, PartialEq)] | ||
enum Kind { | ||
Server = 1, | ||
Client = 2, | ||
} | ||
|
||
fn into_bytes<const N: usize>(id: trace_context::Id) -> Result<[u8; N], IdLengthError> { | ||
id.as_ref().try_into().map_err(|_| { | ||
let bytes: Vec<u8> = id.into(); | ||
IdLengthError { | ||
expected_size: N, | ||
actual_size: bytes.len(), | ||
id: bytes, | ||
} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
use crate::http_tracing::{into_bytes, IdLengthError, Kind, Labels}; | ||
use linkerd_error::Error; | ||
use linkerd_opencensus::proto::trace::v1 as oc; | ||
use linkerd_trace_context as trace_context; | ||
use std::collections::HashMap; | ||
use tokio::sync::mpsc; | ||
|
||
pub type OpenCensusSink = mpsc::Sender<oc::Span>; | ||
|
||
/// SpanConverter converts trace_context::Span objects into OpenCensus agent | ||
/// protobuf span objects. SpanConverter receives trace_context::Span objects by | ||
/// implmenting the SpanSink trait. For each span that it receives, it converts | ||
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. | ||
#[derive(Clone)] | ||
pub struct OcSpanConverter { | ||
kind: Kind, | ||
sink: mpsc::Sender<oc::Span>, | ||
labels: Labels, | ||
} | ||
|
||
impl OcSpanConverter { | ||
pub(super) fn from_sink(sink: OpenCensusSink, kind: Kind, labels: impl Into<Labels>) -> Self { | ||
Self { | ||
kind, | ||
sink, | ||
labels: labels.into(), | ||
} | ||
} | ||
|
||
fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> { | ||
let mut attributes = HashMap::<String, oc::AttributeValue>::new(); | ||
for (k, v) in self.labels.iter() { | ||
attributes.insert( | ||
k.clone(), | ||
oc::AttributeValue { | ||
value: Some(oc::attribute_value::Value::StringValue(truncatable( | ||
v.clone(), | ||
))), | ||
}, | ||
); | ||
} | ||
for (k, v) in span.labels.drain() { | ||
attributes.insert( | ||
k.to_string(), | ||
oc::AttributeValue { | ||
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), | ||
}, | ||
); | ||
} | ||
Ok(oc::Span { | ||
trace_id: into_bytes::<16>(span.trace_id)?.to_vec(), | ||
span_id: into_bytes::<8>(span.span_id)?.to_vec(), | ||
tracestate: None, | ||
parent_span_id: into_bytes::<8>(span.parent_id)?.to_vec(), | ||
name: Some(truncatable(span.span_name)), | ||
kind: self.kind as i32, | ||
start_time: Some(span.start.into()), | ||
end_time: Some(span.end.into()), | ||
attributes: Some(oc::span::Attributes { | ||
attribute_map: attributes, | ||
dropped_attributes_count: 0, | ||
}), | ||
stack_trace: None, | ||
time_events: None, | ||
links: None, | ||
status: None, // TODO: this is gRPC status; we must read response trailers to populate this | ||
resource: None, | ||
same_process_as_parent_span: Some(self.kind == Kind::Client), | ||
child_span_count: None, | ||
}) | ||
} | ||
} | ||
|
||
impl trace_context::SpanSink for OcSpanConverter { | ||
#[inline] | ||
fn is_enabled(&self) -> bool { | ||
true | ||
} | ||
|
||
fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { | ||
let span = self.mk_span(span)?; | ||
self.sink.try_send(span).map_err(Into::into) | ||
} | ||
} | ||
|
||
fn truncatable(value: String) -> oc::TruncatableString { | ||
oc::TruncatableString { | ||
value, | ||
truncated_byte_count: 0, | ||
} | ||
} |
Oops, something went wrong.