Skip to content

Commit

Permalink
Integrate OpenTelemetry into the proxy
Browse files Browse the repository at this point in the history
OpenCensus is a deprecated protocol and is slated to be removed from upstream collectors soon.

This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter, and we can switch the default later.

[#10111](linkerd/linkerd2#10111)

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Sep 24, 2024
1 parent a1d3c79 commit da1775a
Show file tree
Hide file tree
Showing 21 changed files with 625 additions and 261 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ dependencies = [
"linkerd-app-outbound",
"linkerd-error",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-tonic-stream",
"rangemap",
"regex",
Expand Down Expand Up @@ -1184,6 +1185,7 @@ dependencies = [
"linkerd-meshtls",
"linkerd-metrics",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-proxy-api-resolve",
"linkerd-proxy-balance",
"linkerd-proxy-client-policy",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" }
linkerd-app-outbound = { path = "./outbound" }
linkerd-error = { path = "../error" }
linkerd-opencensus = { path = "../opencensus" }
linkerd-opentelemetry = { path = "../opentelemetry" }
linkerd-tonic-stream = { path = "../tonic-stream" }
rangemap = "1"
regex = "1"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" }
linkerd-meshtls = { path = "../../meshtls", default-features = false }
linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] }
linkerd-opencensus = { path = "../../opencensus" }
linkerd-opentelemetry = { path = "../../opentelemetry" }
linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd-proxy-balance = { path = "../../proxy/balance" }
linkerd-proxy-core = { path = "../../proxy/core" }
Expand Down
159 changes: 54 additions & 105 deletions linkerd/app/core/src/http_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,88 @@
use crate::http_tracing::opencensus::{OcSpanConverter, OpenCensusSink};
use crate::http_tracing::opentelemetry::{OpenTelemetrySink, OtelSpanConverter};
use linkerd_error::Error;
use linkerd_opencensus::proto::trace::v1 as oc;
use linkerd_stack::layer;
use linkerd_trace_context::{self as trace_context, TraceContext};
use linkerd_trace_context as trace_context;
use linkerd_trace_context::{Span, TraceContext};
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use tokio::sync::mpsc;

pub type OpenCensusSink = Option<mpsc::Sender<oc::Span>>;
mod opencensus;
mod opentelemetry;

pub type Labels = Arc<HashMap<String, String>>;

/// SpanConverter converts trace_context::Span objects into OpenCensus agent
/// protobuf span objects. SpanConverter receives trace_context::Span objects by
/// implmenting the SpanSink trait. For each span that it receives, it converts
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender.
#[derive(Clone)]
pub struct SpanConverter {
kind: Kind,
sink: mpsc::Sender<oc::Span>,
labels: Labels,
#[derive(Clone, Debug)]
pub enum SpanSink {
Oc(OpenCensusSink),
Otel(OpenTelemetrySink),
}

#[derive(Debug, Error)]
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
pub struct IdLengthError {
id: Vec<u8>,
expected_size: usize,
actual_size: usize,
impl SpanSink {
fn into_converter(self, kind: Kind, labels: impl Into<Labels>) -> SpanConverter {
match self {
SpanSink::Oc(oc) => SpanConverter::Oc(OcSpanConverter::from_sink(oc, kind, labels)),
SpanSink::Otel(otel) => {
SpanConverter::Otel(OtelSpanConverter::from_sink(otel, kind, labels))

Check warning on line 26 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L22-L26

Added lines #L22 - L26 were not covered by tests
}
}
}
}

pub fn server<S>(
sink: OpenCensusSink,
sink: Option<SpanSink>,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Server, sink, labels)
TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Server, labels)))
}

pub fn client<S>(
sink: OpenCensusSink,
sink: Option<SpanSink>,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Client, sink, labels)
TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Client, labels)))
}

#[derive(Copy, Clone, Debug, PartialEq)]
enum Kind {
Server = 1,
Client = 2,
}

impl SpanConverter {
fn layer<S>(
kind: Kind,
sink: OpenCensusSink,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<Self>, S>> + Clone {
TraceContext::layer(sink.map(move |sink| Self {
kind,
sink,
labels: labels.into(),
}))
}

fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
for (k, v) in self.labels.iter() {
attributes.insert(
k.clone(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(
v.clone(),
))),
},
);
}
for (k, v) in span.labels.drain() {
attributes.insert(
k.to_string(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),
},
);
}
Ok(oc::Span {
trace_id: into_bytes(span.trace_id, 16)?,
span_id: into_bytes(span.span_id, 8)?,
tracestate: None,
parent_span_id: into_bytes(span.parent_id, 8)?,
name: Some(truncatable(span.span_name)),
kind: self.kind as i32,
start_time: Some(span.start.into()),
end_time: Some(span.end.into()),
attributes: Some(oc::span::Attributes {
attribute_map: attributes,
dropped_attributes_count: 0,
}),
stack_trace: None,
time_events: None,
links: None,
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
resource: None,
same_process_as_parent_span: Some(self.kind == Kind::Client),
child_span_count: None,
})
}
#[derive(Clone)]
pub enum SpanConverter {
Oc(OcSpanConverter),
Otel(OtelSpanConverter),
}

impl trace_context::SpanSink for SpanConverter {
#[inline]
fn is_enabled(&self) -> bool {
true
}

fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> {
let span = self.mk_span(span)?;
self.sink.try_send(span).map_err(Into::into)
fn try_send(&mut self, span: Span) -> Result<(), Error> {
match self {
SpanConverter::Oc(oc) => oc.try_send(span),
SpanConverter::Otel(otel) => otel.try_send(span),

Check warning on line 60 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L57-L60

Added lines #L57 - L60 were not covered by tests
}
}
}

fn into_bytes(id: trace_context::Id, size: usize) -> Result<Vec<u8>, IdLengthError> {
let bytes: Vec<u8> = id.into();
if bytes.len() == size {
Ok(bytes)
} else {
let actual_size = bytes.len();
Err(IdLengthError {
id: bytes,
expected_size: size,
actual_size,
})
}
#[derive(Debug, Error)]
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
pub struct IdLengthError {
id: Vec<u8>,
expected_size: usize,
actual_size: usize,
}

fn truncatable(value: String) -> oc::TruncatableString {
oc::TruncatableString {
value,
truncated_byte_count: 0,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum Kind {
Server = 1,
Client = 2,
}

fn into_bytes<const N: usize>(id: trace_context::Id) -> Result<[u8; N], IdLengthError> {
id.as_ref().try_into().map_err(|_| {
let bytes: Vec<u8> = id.into();
IdLengthError {
expected_size: N,
actual_size: bytes.len(),
id: bytes,

Check warning on line 85 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L79-L85

Added lines #L79 - L85 were not covered by tests
}
})
}
91 changes: 91 additions & 0 deletions linkerd/app/core/src/http_tracing/opencensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::http_tracing::{into_bytes, IdLengthError, Kind, Labels};
use linkerd_error::Error;
use linkerd_opencensus::proto::trace::v1 as oc;
use linkerd_trace_context as trace_context;
use std::collections::HashMap;
use tokio::sync::mpsc;

pub type OpenCensusSink = mpsc::Sender<oc::Span>;

/// SpanConverter converts trace_context::Span objects into OpenCensus agent
/// protobuf span objects. SpanConverter receives trace_context::Span objects by
/// implmenting the SpanSink trait. For each span that it receives, it converts
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender.
#[derive(Clone)]
pub struct OcSpanConverter {
kind: Kind,
sink: mpsc::Sender<oc::Span>,
labels: Labels,
}

impl OcSpanConverter {
pub(super) fn from_sink(sink: OpenCensusSink, kind: Kind, labels: impl Into<Labels>) -> Self {

Check warning on line 22 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L22

Added line #L22 was not covered by tests
Self {
kind,
sink,
labels: labels.into(),

Check warning on line 26 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L26

Added line #L26 was not covered by tests
}
}

fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
for (k, v) in self.labels.iter() {
attributes.insert(
k.clone(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(
v.clone(),

Check warning on line 37 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L30-L37

Added lines #L30 - L37 were not covered by tests
))),
},
);
}
for (k, v) in span.labels.drain() {
attributes.insert(
k.to_string(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),

Check warning on line 46 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L42-L46

Added lines #L42 - L46 were not covered by tests
},
);
}
Ok(oc::Span {
trace_id: into_bytes::<16>(span.trace_id)?.to_vec(),
span_id: into_bytes::<8>(span.span_id)?.to_vec(),
tracestate: None,
parent_span_id: into_bytes::<8>(span.parent_id)?.to_vec(),
name: Some(truncatable(span.span_name)),
kind: self.kind as i32,
start_time: Some(span.start.into()),
end_time: Some(span.end.into()),
attributes: Some(oc::span::Attributes {
attribute_map: attributes,

Check warning on line 60 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L50-L60

Added lines #L50 - L60 were not covered by tests
dropped_attributes_count: 0,
}),
stack_trace: None,
time_events: None,
links: None,
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
resource: None,
same_process_as_parent_span: Some(self.kind == Kind::Client),
child_span_count: None,

Check warning on line 69 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L63-L69

Added lines #L63 - L69 were not covered by tests
})
}
}

impl trace_context::SpanSink for OcSpanConverter {
#[inline]
fn is_enabled(&self) -> bool {
true

Check warning on line 77 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L76-L77

Added lines #L76 - L77 were not covered by tests
}

fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> {
let span = self.mk_span(span)?;
self.sink.try_send(span).map_err(Into::into)

Check warning on line 82 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L80-L82

Added lines #L80 - L82 were not covered by tests
}
}

fn truncatable(value: String) -> oc::TruncatableString {

Check warning on line 86 in linkerd/app/core/src/http_tracing/opencensus.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing/opencensus.rs#L86

Added line #L86 was not covered by tests
oc::TruncatableString {
value,
truncated_byte_count: 0,
}
}
Loading

0 comments on commit da1775a

Please sign in to comment.