From da1775aca7648d24bdf2d94d40b9867c3ec33ba8 Mon Sep 17 00:00:00 2001 From: Scott Fleener Date: Mon, 23 Sep 2024 20:14:19 +0000 Subject: [PATCH] Integrate OpenTelemetry into the proxy OpenCensus is a deprecated protocol and is slated to be removed from upstream collectors soon. This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter, and we can switch the default later. [#10111](linkerd/linkerd2#10111) Signed-off-by: Scott Fleener --- Cargo.lock | 2 + linkerd/app/Cargo.toml | 1 + linkerd/app/core/Cargo.toml | 1 + linkerd/app/core/src/http_tracing.rs | 159 ++++++------------ .../app/core/src/http_tracing/opencensus.rs | 91 ++++++++++ .../core/src/http_tracing/opentelemetry.rs | 85 ++++++++++ linkerd/app/core/src/lib.rs | 3 +- linkerd/app/core/src/metrics.rs | 6 +- linkerd/app/inbound/src/lib.rs | 12 +- linkerd/app/outbound/src/http/server.rs | 2 +- linkerd/app/outbound/src/lib.rs | 4 +- linkerd/app/src/env.rs | 28 ++- .../app/src/env/{opencensus.rs => trace.rs} | 0 linkerd/app/src/lib.rs | 50 +++--- linkerd/app/src/oc_collector.rs | 103 ------------ linkerd/app/src/trace_collector.rs | 139 +++++++++++++++ .../app/src/trace_collector/oc_collector.rs | 62 +++++++ .../app/src/trace_collector/otel_collector.rs | 116 +++++++++++++ linkerd/opentelemetry/src/lib.rs | 2 + linkerd2-proxy/src/main.rs | 11 +- tools/src/bin/gen-protos.rs | 9 +- 21 files changed, 625 insertions(+), 261 deletions(-) create mode 100644 linkerd/app/core/src/http_tracing/opencensus.rs create mode 100644 linkerd/app/core/src/http_tracing/opentelemetry.rs rename linkerd/app/src/env/{opencensus.rs => trace.rs} (100%) delete mode 100644 linkerd/app/src/oc_collector.rs create mode 100644 linkerd/app/src/trace_collector.rs create mode 100644 linkerd/app/src/trace_collector/oc_collector.rs create mode 100644 linkerd/app/src/trace_collector/otel_collector.rs diff --git a/Cargo.lock b/Cargo.lock index bce8c03238..d62896aca2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1126,6 +1126,7 @@ dependencies = [ "linkerd-app-outbound", "linkerd-error", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-tonic-stream", "rangemap", "regex", @@ -1184,6 +1185,7 @@ dependencies = [ "linkerd-meshtls", "linkerd-metrics", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-proxy-api-resolve", "linkerd-proxy-balance", "linkerd-proxy-client-policy", diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index af443530a3..1420c6edfe 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" } linkerd-app-outbound = { path = "./outbound" } linkerd-error = { path = "../error" } linkerd-opencensus = { path = "../opencensus" } +linkerd-opentelemetry = { path = "../opentelemetry" } linkerd-tonic-stream = { path = "../tonic-stream" } rangemap = "1" regex = "1" diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 43e19a2816..fff82289f7 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" } linkerd-meshtls = { path = "../../meshtls", default-features = false } linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] } linkerd-opencensus = { path = "../../opencensus" } +linkerd-opentelemetry = { path = "../../opentelemetry" } linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" } linkerd-proxy-balance = { path = "../../proxy/balance" } linkerd-proxy-core = { path = "../../proxy/core" } diff --git a/linkerd/app/core/src/http_tracing.rs b/linkerd/app/core/src/http_tracing.rs index 6b24c599e3..b1be24b409 100644 --- a/linkerd/app/core/src/http_tracing.rs +++ b/linkerd/app/core/src/http_tracing.rs @@ -1,139 +1,88 @@ +use crate::http_tracing::opencensus::{OcSpanConverter, OpenCensusSink}; +use crate::http_tracing::opentelemetry::{OpenTelemetrySink, OtelSpanConverter}; use linkerd_error::Error; -use linkerd_opencensus::proto::trace::v1 as oc; use linkerd_stack::layer; -use linkerd_trace_context::{self as trace_context, TraceContext}; +use linkerd_trace_context as trace_context; +use linkerd_trace_context::{Span, TraceContext}; use std::{collections::HashMap, sync::Arc}; use thiserror::Error; -use tokio::sync::mpsc; -pub type OpenCensusSink = Option>; +mod opencensus; +mod opentelemetry; + pub type Labels = Arc>; -/// SpanConverter converts trace_context::Span objects into OpenCensus agent -/// protobuf span objects. SpanConverter receives trace_context::Span objects by -/// implmenting the SpanSink trait. For each span that it receives, it converts -/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. -#[derive(Clone)] -pub struct SpanConverter { - kind: Kind, - sink: mpsc::Sender, - labels: Labels, +#[derive(Clone, Debug)] +pub enum SpanSink { + Oc(OpenCensusSink), + Otel(OpenTelemetrySink), } -#[derive(Debug, Error)] -#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] -pub struct IdLengthError { - id: Vec, - expected_size: usize, - actual_size: usize, +impl SpanSink { + fn into_converter(self, kind: Kind, labels: impl Into) -> SpanConverter { + match self { + SpanSink::Oc(oc) => SpanConverter::Oc(OcSpanConverter::from_sink(oc, kind, labels)), + SpanSink::Otel(otel) => { + SpanConverter::Otel(OtelSpanConverter::from_sink(otel, kind, labels)) + } + } + } } pub fn server( - sink: OpenCensusSink, + sink: Option, labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Server, sink, labels) + TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Server, labels))) } pub fn client( - sink: OpenCensusSink, + sink: Option, labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Client, sink, labels) + TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Client, labels))) } -#[derive(Copy, Clone, Debug, PartialEq)] -enum Kind { - Server = 1, - Client = 2, -} - -impl SpanConverter { - fn layer( - kind: Kind, - sink: OpenCensusSink, - labels: impl Into, - ) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| Self { - kind, - sink, - labels: labels.into(), - })) - } - - fn mk_span(&self, mut span: trace_context::Span) -> Result { - let mut attributes = HashMap::::new(); - for (k, v) in self.labels.iter() { - attributes.insert( - k.clone(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable( - v.clone(), - ))), - }, - ); - } - for (k, v) in span.labels.drain() { - attributes.insert( - k.to_string(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), - }, - ); - } - Ok(oc::Span { - trace_id: into_bytes(span.trace_id, 16)?, - span_id: into_bytes(span.span_id, 8)?, - tracestate: None, - parent_span_id: into_bytes(span.parent_id, 8)?, - name: Some(truncatable(span.span_name)), - kind: self.kind as i32, - start_time: Some(span.start.into()), - end_time: Some(span.end.into()), - attributes: Some(oc::span::Attributes { - attribute_map: attributes, - dropped_attributes_count: 0, - }), - stack_trace: None, - time_events: None, - links: None, - status: None, // TODO: this is gRPC status; we must read response trailers to populate this - resource: None, - same_process_as_parent_span: Some(self.kind == Kind::Client), - child_span_count: None, - }) - } +#[derive(Clone)] +pub enum SpanConverter { + Oc(OcSpanConverter), + Otel(OtelSpanConverter), } impl trace_context::SpanSink for SpanConverter { - #[inline] fn is_enabled(&self) -> bool { true } - fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { - let span = self.mk_span(span)?; - self.sink.try_send(span).map_err(Into::into) + fn try_send(&mut self, span: Span) -> Result<(), Error> { + match self { + SpanConverter::Oc(oc) => oc.try_send(span), + SpanConverter::Otel(otel) => otel.try_send(span), + } } } -fn into_bytes(id: trace_context::Id, size: usize) -> Result, IdLengthError> { - let bytes: Vec = id.into(); - if bytes.len() == size { - Ok(bytes) - } else { - let actual_size = bytes.len(); - Err(IdLengthError { - id: bytes, - expected_size: size, - actual_size, - }) - } +#[derive(Debug, Error)] +#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] +pub struct IdLengthError { + id: Vec, + expected_size: usize, + actual_size: usize, } -fn truncatable(value: String) -> oc::TruncatableString { - oc::TruncatableString { - value, - truncated_byte_count: 0, - } +#[derive(Copy, Clone, Debug, PartialEq)] +enum Kind { + Server = 1, + Client = 2, +} + +fn into_bytes(id: trace_context::Id) -> Result<[u8; N], IdLengthError> { + id.as_ref().try_into().map_err(|_| { + let bytes: Vec = id.into(); + IdLengthError { + expected_size: N, + actual_size: bytes.len(), + id: bytes, + } + }) } diff --git a/linkerd/app/core/src/http_tracing/opencensus.rs b/linkerd/app/core/src/http_tracing/opencensus.rs new file mode 100644 index 0000000000..7bca45902e --- /dev/null +++ b/linkerd/app/core/src/http_tracing/opencensus.rs @@ -0,0 +1,91 @@ +use crate::http_tracing::{into_bytes, IdLengthError, Kind, Labels}; +use linkerd_error::Error; +use linkerd_opencensus::proto::trace::v1 as oc; +use linkerd_trace_context as trace_context; +use std::collections::HashMap; +use tokio::sync::mpsc; + +pub type OpenCensusSink = mpsc::Sender; + +/// SpanConverter converts trace_context::Span objects into OpenCensus agent +/// protobuf span objects. SpanConverter receives trace_context::Span objects by +/// implmenting the SpanSink trait. For each span that it receives, it converts +/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. +#[derive(Clone)] +pub struct OcSpanConverter { + kind: Kind, + sink: mpsc::Sender, + labels: Labels, +} + +impl OcSpanConverter { + pub(super) fn from_sink(sink: OpenCensusSink, kind: Kind, labels: impl Into) -> Self { + Self { + kind, + sink, + labels: labels.into(), + } + } + + fn mk_span(&self, mut span: trace_context::Span) -> Result { + let mut attributes = HashMap::::new(); + for (k, v) in self.labels.iter() { + attributes.insert( + k.clone(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable( + v.clone(), + ))), + }, + ); + } + for (k, v) in span.labels.drain() { + attributes.insert( + k.to_string(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), + }, + ); + } + Ok(oc::Span { + trace_id: into_bytes::<16>(span.trace_id)?.to_vec(), + span_id: into_bytes::<8>(span.span_id)?.to_vec(), + tracestate: None, + parent_span_id: into_bytes::<8>(span.parent_id)?.to_vec(), + name: Some(truncatable(span.span_name)), + kind: self.kind as i32, + start_time: Some(span.start.into()), + end_time: Some(span.end.into()), + attributes: Some(oc::span::Attributes { + attribute_map: attributes, + dropped_attributes_count: 0, + }), + stack_trace: None, + time_events: None, + links: None, + status: None, // TODO: this is gRPC status; we must read response trailers to populate this + resource: None, + same_process_as_parent_span: Some(self.kind == Kind::Client), + child_span_count: None, + }) + } +} + +impl trace_context::SpanSink for OcSpanConverter { + #[inline] + fn is_enabled(&self) -> bool { + true + } + + fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { + let span = self.mk_span(span)?; + self.sink.try_send(span).map_err(Into::into) + } +} + +fn truncatable(value: String) -> oc::TruncatableString { + oc::TruncatableString { + value, + truncated_byte_count: 0, + } +} diff --git a/linkerd/app/core/src/http_tracing/opentelemetry.rs b/linkerd/app/core/src/http_tracing/opentelemetry.rs new file mode 100644 index 0000000000..8fcecd2ea1 --- /dev/null +++ b/linkerd/app/core/src/http_tracing/opentelemetry.rs @@ -0,0 +1,85 @@ +use crate::http_tracing::{into_bytes, IdLengthError, Kind, Labels}; +use linkerd_error::Error; +use linkerd_opentelemetry::otel::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, +}; +use linkerd_opentelemetry::otel::KeyValue; +use linkerd_opentelemetry::sdk::{export::trace::SpanData, trace::SpanLinks, Resource}; +use linkerd_trace_context as trace_context; +use std::borrow::Cow; +use tokio::sync::mpsc; + +pub type OpenTelemetrySink = mpsc::Sender; + +/// SpanConverter converts trace_context::Span objects into OpenTelemetry agent +/// protobuf span objects. SpanConverter receives trace_context::Span objects by +/// implmenting the SpanSink trait. For each span that it receives, it converts +/// it to an OpenTelemetry span and then sends it on the provided mpsc::Sender. +#[derive(Clone)] +pub struct OtelSpanConverter { + kind: Kind, + sink: OpenTelemetrySink, + labels: Labels, + resource: Cow<'static, Resource>, +} + +impl OtelSpanConverter { + pub(super) fn from_sink( + sink: OpenTelemetrySink, + kind: Kind, + labels: impl Into, + ) -> Self { + Self { + kind, + sink, + labels: labels.into(), + resource: Cow::Owned(Resource::default()), + } + } + + fn mk_span(&self, mut span: trace_context::Span) -> Result { + let mut attributes = Vec::::new(); + for (k, v) in self.labels.iter() { + attributes.push(KeyValue::new(k.clone(), v.clone())); + } + for (k, v) in span.labels.drain() { + attributes.push(KeyValue::new(k, v.clone())); + } + Ok(SpanData { + parent_span_id: SpanId::from_bytes(into_bytes(span.parent_id)?), + span_kind: match self.kind { + Kind::Server => SpanKind::Server, + Kind::Client => SpanKind::Client, + }, + name: span.span_name.into(), + start_time: span.start, + end_time: span.end, + attributes, + dropped_attributes_count: 0, + links: SpanLinks::default(), + status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this + resource: self.resource.clone(), + span_context: SpanContext::new( + TraceId::from_bytes(into_bytes(span.trace_id)?), + SpanId::from_bytes(into_bytes(span.span_id)?), + TraceFlags::default(), + /* is_remote= */ self.kind != Kind::Client, + TraceState::NONE, + ), + events: Default::default(), + instrumentation_lib: Default::default(), + }) + } +} + +impl trace_context::SpanSink for OtelSpanConverter { + #[inline] + fn is_enabled(&self) -> bool { + true + } + + fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { + let span = self.mk_span(span)?; + self.sink.try_send(span).map_err(Into::into) + } +} diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index fc98a3b621..8d3a433791 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics; pub use linkerd_idle_cache as idle_cache; pub use linkerd_io as io; pub use linkerd_opencensus as opencensus; +pub use linkerd_opentelemetry as opentelemetry; pub use linkerd_service_profiles as profiles; pub use linkerd_stack_metrics as stack_metrics; pub use linkerd_stack_tracing as stack_tracing; @@ -65,7 +66,7 @@ pub struct ProxyRuntime { pub identity: identity::creds::Receiver, pub metrics: metrics::Proxy, pub tap: proxy::tap::Registry, - pub span_sink: http_tracing::OpenCensusSink, + pub span_sink: Option, pub drain: drain::Watch, } diff --git a/linkerd/app/core/src/metrics.rs b/linkerd/app/core/src/metrics.rs index 3fd374031b..084403e365 100644 --- a/linkerd/app/core/src/metrics.rs +++ b/linkerd/app/core/src/metrics.rs @@ -9,7 +9,7 @@ pub use crate::transport::labels::{TargetAddr, TlsAccept}; use crate::{ classify::Class, - control, http_metrics, opencensus, profiles, stack_metrics, + control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics, svc::Param, tls, transport::{self, labels::TlsConnect}, @@ -39,6 +39,7 @@ pub struct Metrics { pub proxy: Proxy, pub control: ControlHttp, pub opencensus: opencensus::metrics::Registry, + pub opentelemetry: opentelemetry::metrics::Registry, } #[derive(Clone, Debug)] @@ -191,11 +192,13 @@ impl Metrics { }; let (opencensus, opencensus_report) = opencensus::metrics::new(); + let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new(); let metrics = Metrics { proxy, control, opencensus, + opentelemetry, }; let report = endpoint_report @@ -205,6 +208,7 @@ impl Metrics { .and_report(control_report) .and_report(transport_report) .and_report(opencensus_report) + .and_report(opentelemetry_report) .and_report(stack); (metrics, report) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 88ec18afb8..5074979d0c 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -18,12 +18,13 @@ mod server; #[cfg(any(test, feature = "test-util", fuzzing))] pub mod test_util; +#[cfg(fuzzing)] +pub use self::http::fuzz as http_fuzz; pub use self::{metrics::InboundMetrics, policy::DefaultPolicy}; +use linkerd_app_core::http_tracing::SpanSink; use linkerd_app_core::{ config::{ConnectConfig, ProxyConfig, QueueConfig}, - drain, - http_tracing::OpenCensusSink, - identity, io, + drain, identity, io, proxy::{tap, tcp}, svc, transport::{self, Remote, ServerAddr}, @@ -33,9 +34,6 @@ use std::{fmt::Debug, time::Duration}; use thiserror::Error; use tracing::debug_span; -#[cfg(fuzzing)] -pub use self::http::fuzz as http_fuzz; - #[derive(Clone, Debug)] pub struct Config { pub allow_discovery: NameMatch, @@ -67,7 +65,7 @@ struct Runtime { metrics: InboundMetrics, identity: identity::creds::Receiver, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index dc8f77cb1c..0fb34ded58 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -47,7 +47,7 @@ impl Outbound> { .check_new_service::>() .push(ServerRescue::layer(config.emit_headers)) .check_new_service::>() - // Initiates OpenCensus tracing. + // Initiates OpenTelemetry tracing. .push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels())) .push_on_service(http::BoxResponse::layer()) // Convert origin form HTTP/1 URIs to absolute form for Hyper's diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 75ee6c70d5..2cc8fccecc 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -6,11 +6,11 @@ #![allow(opaque_hidden_inferred_bound)] #![forbid(unsafe_code)] +use linkerd_app_core::http_tracing::SpanSink; use linkerd_app_core::{ config::{ProxyConfig, QueueConfig}, drain, exp_backoff::ExponentialBackoff, - http_tracing::OpenCensusSink, identity, io, metrics::prom, profiles, @@ -95,7 +95,7 @@ struct Runtime { metrics: OutboundMetrics, identity: identity::NewClient, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 49e00a88ba..710b572ce4 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -1,4 +1,5 @@ -use crate::{dns, gateway, identity, inbound, oc_collector, outbound, policy, spire}; +use crate::trace_collector::CollectorKind; +use crate::{dns, gateway, identity, inbound, outbound, policy, spire, trace_collector}; use linkerd_app_core::{ addr, config::*, @@ -19,7 +20,7 @@ use tracing::{debug, error, info, warn}; mod control; mod http2; -mod opencensus; +mod trace; mod types; use self::types::*; @@ -146,6 +147,7 @@ const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str = "LINKERD2_PROXY_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS"; pub const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; +pub const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL"; /// Constrains which destination names may be used for profile/route discovery. /// @@ -428,7 +430,8 @@ pub fn parse_config(strings: &S) -> Result let hostname = strings.get(ENV_HOSTNAME); - let oc_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_protocol = strings.get(ENV_TRACE_PROTOCOL); let trace_collector_addr = parse_control_addr(strings, ENV_TRACE_COLLECTOR_SVC_BASE); @@ -813,8 +816,8 @@ pub fn parse_config(strings: &S) -> Result .into(), }; - let oc_collector = match trace_collector_addr? { - None => oc_collector::Config::Disabled, + let trace_collector = match trace_collector_addr? { + None => trace_collector::Config::Disabled, Some(addr) => { let connect = if addr.addr.is_loopback() { inbound.proxy.connect.clone() @@ -826,14 +829,20 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.http_request_queue.failfast_timeout }; - let attributes = oc_attributes_file_path + let attributes = trace_attributes_file_path .map(|path| match path.and_then(|p| p.parse::().ok()) { - Some(path) => opencensus::read_trace_attributes(&path), + Some(path) => trace::read_trace_attributes(&path), None => HashMap::new(), }) .unwrap_or_default(); - oc_collector::Config::Enabled(Box::new(oc_collector::EnabledConfig { + let trace_protocol = trace_protocol + .map(|proto| proto.and_then(|p| p.parse::().ok())) + .ok() + .flatten() + .unwrap_or_default(); + + trace_collector::Config::Enabled(Box::new(trace_collector::EnabledConfig { attributes, hostname: hostname?, control: ControlConfig { @@ -844,6 +853,7 @@ pub fn parse_config(strings: &S) -> Result failfast_timeout, }, }, + kind: trace_protocol, })) } }; @@ -923,7 +933,7 @@ pub fn parse_config(strings: &S) -> Result dns, dst, tap, - oc_collector, + trace_collector, policy, identity, outbound, diff --git a/linkerd/app/src/env/opencensus.rs b/linkerd/app/src/env/trace.rs similarity index 100% rename from linkerd/app/src/env/opencensus.rs rename to linkerd/app/src/env/trace.rs diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index aa73893851..70e6639e86 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -7,10 +7,10 @@ pub mod dst; pub mod env; pub mod identity; -pub mod oc_collector; pub mod policy; pub mod spire; pub mod tap; +pub mod trace_collector; pub use self::metrics::Metrics; use futures::{future, Future, FutureExt}; @@ -60,7 +60,7 @@ pub struct Config { pub policy: policy::Config, pub admin: admin::Config, pub tap: tap::Config, - pub oc_collector: oc_collector::Config, + pub trace_collector: trace_collector::Config, /// Grace period for graceful shutdowns. /// @@ -75,7 +75,7 @@ pub struct App { dst: ControlAddr, identity: identity::Identity, inbound_addr: Local, - oc_collector: oc_collector::OcCollector, + trace_collector: trace_collector::TraceCollector, outbound_addr: Local, outbound_addr_additional: Option>, start_proxy: Pin + Send + 'static>>, @@ -123,7 +123,7 @@ impl Config { policy, identity, inbound, - oc_collector, + trace_collector, outbound, gateway, tap, @@ -188,16 +188,25 @@ impl Config { }) }?; - debug!(config = ?oc_collector, "Building client"); - let oc_collector = { - let control_metrics = - ControlMetrics::register(registry.sub_registry_with_prefix("opencensus")); + debug!(config = ?trace_collector, "Building client"); + let trace_collector = { + let control_metrics = ControlMetrics::register( + registry.sub_registry_with_prefix(trace_collector.metrics_prefix()), + ); let identity = identity.receiver().new_client(); let dns = dns.resolver; let client_metrics = metrics.control.clone(); - let metrics = metrics.opencensus; - info_span!("opencensus").in_scope(|| { - oc_collector.build(identity, dns, metrics, control_metrics, client_metrics) + let otel_metrics = metrics.opentelemetry; + let oc_metrics = metrics.opencensus; + trace_collector.info_span().in_scope(|| { + trace_collector.build( + identity, + dns, + oc_metrics, + otel_metrics, + control_metrics, + client_metrics, + ) }) }?; @@ -205,7 +214,7 @@ impl Config { identity: identity.receiver(), metrics: metrics.proxy, tap: tap.registry(), - span_sink: oc_collector.span_sink(), + span_sink: trace_collector.span_sink(), drain: drain_rx.clone(), }; let inbound = Inbound::new(inbound, runtime.clone()); @@ -309,7 +318,7 @@ impl Config { drain: drain_tx, identity, inbound_addr, - oc_collector, + trace_collector, outbound_addr, outbound_addr_additional, start_proxy, @@ -369,10 +378,10 @@ impl App { self.identity.receiver().local_id().clone() } - pub fn opencensus_addr(&self) -> Option<&ControlAddr> { - match self.oc_collector { - oc_collector::OcCollector::Disabled { .. } => None, - oc_collector::OcCollector::Enabled(ref oc) => Some(&oc.addr), + pub fn tracing_addr(&self) -> Option<&ControlAddr> { + match self.trace_collector { + trace_collector::TraceCollector::Disabled { .. } => None, + crate::trace_collector::TraceCollector::Enabled(ref oc) => Some(&oc.addr), } } @@ -381,7 +390,7 @@ impl App { admin, drain, identity, - oc_collector, + trace_collector: collector, start_proxy, tap, .. @@ -446,8 +455,9 @@ impl App { tokio::spawn(serve.instrument(info_span!("tap").or_current())); } - if let oc_collector::OcCollector::Enabled(oc) = oc_collector { - tokio::spawn(oc.task.instrument(info_span!("opencensus").or_current())); + if let trace_collector::TraceCollector::Enabled(collector) = collector { + let span = collector.info_span().or_current(); + tokio::spawn(collector.task.instrument(span)); } // we don't care if the admin shutdown channel is diff --git a/linkerd/app/src/oc_collector.rs b/linkerd/app/src/oc_collector.rs deleted file mode 100644 index e8f25e23c5..0000000000 --- a/linkerd/app/src/oc_collector.rs +++ /dev/null @@ -1,103 +0,0 @@ -use linkerd_app_core::{ - control, dns, identity, metrics::ControlHttp as HttpMetrics, svc::NewService, Error, -}; -use linkerd_opencensus::{self as opencensus, metrics, proto}; -use std::{collections::HashMap, future::Future, pin::Pin, time::SystemTime}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tracing::Instrument; - -#[derive(Clone, Debug)] -pub enum Config { - Disabled, - Enabled(Box), -} - -#[derive(Clone, Debug)] -pub struct EnabledConfig { - pub control: control::Config, - pub attributes: HashMap, - pub hostname: Option, -} - -pub type Task = Pin + Send + 'static>>; - -pub type SpanSink = mpsc::Sender; - -pub enum OcCollector { - Disabled, - Enabled(Box), -} - -pub struct EnabledCollector { - pub addr: control::ControlAddr, - pub span_sink: SpanSink, - pub task: Task, -} - -impl Config { - const SPAN_BUFFER_CAPACITY: usize = 100; - const SERVICE_NAME: &'static str = "linkerd-proxy"; - - pub fn build( - self, - identity: identity::NewClient, - dns: dns::Resolver, - legacy_metrics: metrics::Registry, - control_metrics: control::Metrics, - client_metrics: HttpMetrics, - ) -> Result { - match self { - Config::Disabled => Ok(OcCollector::Disabled), - Config::Enabled(inner) => { - let addr = inner.control.addr.clone(); - let svc = inner - .control - .build(dns, client_metrics, control_metrics, identity) - .new_service(()); - - let (span_sink, spans_rx) = mpsc::channel(Self::SPAN_BUFFER_CAPACITY); - let spans_rx = ReceiverStream::new(spans_rx); - - let task = { - use self::proto::agent::common::v1 as oc; - - let node = oc::Node { - identifier: Some(oc::ProcessIdentifier { - host_name: inner.hostname.unwrap_or_default(), - pid: std::process::id(), - start_timestamp: Some(SystemTime::now().into()), - }), - service_info: Some(oc::ServiceInfo { - name: Self::SERVICE_NAME.to_string(), - }), - attributes: inner.attributes, - ..oc::Node::default() - }; - - let addr = addr.clone(); - Box::pin( - opencensus::export_spans(svc, node, spans_rx, legacy_metrics).instrument( - tracing::debug_span!("opencensus", peer.addr = %addr).or_current(), - ), - ) - }; - - Ok(OcCollector::Enabled(Box::new(EnabledCollector { - addr, - task, - span_sink, - }))) - } - } - } -} - -impl OcCollector { - pub fn span_sink(&self) -> Option { - match self { - OcCollector::Disabled => None, - OcCollector::Enabled(inner) => Some(inner.span_sink.clone()), - } - } -} diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs new file mode 100644 index 0000000000..d28d12d6a1 --- /dev/null +++ b/linkerd/app/src/trace_collector.rs @@ -0,0 +1,139 @@ +use linkerd_app_core::http_tracing::SpanSink; +use linkerd_app_core::metrics::ControlHttp as HttpMetrics; +use linkerd_app_core::svc::NewService; +use linkerd_app_core::{control, dns, identity, opencensus, opentelemetry}; +use linkerd_error::Error; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::str::FromStr; +use tracing::info_span; + +pub mod oc_collector; +pub mod otel_collector; + +#[derive(Clone, Debug)] +pub enum Config { + Disabled, + Enabled(Box), +} + +#[derive(Debug, Copy, Clone, Default)] +pub enum CollectorKind { + #[default] + OpenCensus, + OpenTelemetry, +} + +impl FromStr for CollectorKind { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "opencensus" => Ok(CollectorKind::OpenCensus), + "opentelemetry" => Ok(CollectorKind::OpenTelemetry), + _ => Err(()), + } + } +} + +#[derive(Clone, Debug)] +pub struct EnabledConfig { + pub control: control::Config, + pub attributes: HashMap, + pub hostname: Option, + pub kind: CollectorKind, +} + +pub type Task = Pin + Send + 'static>>; + +pub enum TraceCollector { + Disabled, + Enabled(Box), +} + +pub struct EnabledCollector { + pub addr: control::ControlAddr, + pub span_sink: SpanSink, + pub task: Task, +} + +impl EnabledCollector { + pub fn info_span(&self) -> tracing::Span { + match self.span_sink { + SpanSink::Oc(_) => info_span!("opencensus"), + SpanSink::Otel(_) => info_span!("opentelemetry"), + } + } +} + +impl TraceCollector { + pub fn span_sink(&self) -> Option { + match self { + TraceCollector::Disabled => None, + TraceCollector::Enabled(inner) => Some(inner.span_sink.clone()), + } + } +} + +impl Config { + pub fn info_span(&self) -> tracing::Span { + match self { + Config::Disabled => info_span!("disabled_tracing"), + Config::Enabled(config) => match config.kind { + CollectorKind::OpenCensus => info_span!("opencensus"), + CollectorKind::OpenTelemetry => info_span!("opentelemetry"), + }, + } + } + + pub fn metrics_prefix(&self) -> &'static str { + match self { + Config::Disabled => "disabled_tracing", + Config::Enabled(config) => match config.kind { + CollectorKind::OpenCensus => "opencensus", + CollectorKind::OpenTelemetry => "opentelemetry", + }, + } + } + + pub fn build( + self, + identity: identity::NewClient, + dns: dns::Resolver, + legacy_oc_metrics: opencensus::metrics::Registry, + legacy_otel_metrics: opentelemetry::metrics::Registry, + control_metrics: control::Metrics, + client_metrics: HttpMetrics, + ) -> Result { + match self { + Config::Disabled => Ok(TraceCollector::Disabled), + Config::Enabled(inner) => { + let addr = inner.control.addr.clone(); + let svc = inner + .control + .build(dns, client_metrics, control_metrics, identity) + .new_service(()); + + let collector = match inner.kind { + CollectorKind::OpenCensus => oc_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_oc_metrics, + ), + CollectorKind::OpenTelemetry => otel_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_otel_metrics, + ), + }; + + Ok(TraceCollector::Enabled(Box::new(collector))) + } + } + } +} diff --git a/linkerd/app/src/trace_collector/oc_collector.rs b/linkerd/app/src/trace_collector/oc_collector.rs new file mode 100644 index 0000000000..89bf7bea6c --- /dev/null +++ b/linkerd/app/src/trace_collector/oc_collector.rs @@ -0,0 +1,62 @@ +use crate::trace_collector::EnabledCollector; +use linkerd_app_core::control::ControlAddr; +use linkerd_app_core::http_tracing::SpanSink; +use linkerd_app_core::proxy::http::HttpBody; +use linkerd_app_core::Error; +use linkerd_opencensus::{self as opencensus, metrics, proto}; +use std::{collections::HashMap, time::SystemTime}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::body::BoxBody; +use tonic::client::GrpcService; +use tracing::Instrument; + +const SPAN_BUFFER_CAPACITY: usize = 100; +const SERVICE_NAME: &str = "linkerd-proxy"; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let task = { + use self::proto::agent::common::v1 as oc; + + let node = oc::Node { + identifier: Some(oc::ProcessIdentifier { + host_name: hostname.unwrap_or_default(), + pid: std::process::id(), + start_timestamp: Some(SystemTime::now().into()), + }), + service_info: Some(oc::ServiceInfo { + name: SERVICE_NAME.to_string(), + }), + attributes, + ..oc::Node::default() + }; + + let addr = addr.clone(); + Box::pin( + opencensus::export_spans(svc, node, spans_rx, legacy_metrics) + .instrument(tracing::debug_span!("opencensus", peer.addr = %addr).or_current()), + ) + }; + + EnabledCollector { + addr, + task, + span_sink: SpanSink::Oc(span_sink), + } +} diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs new file mode 100644 index 0000000000..bc4fd6293d --- /dev/null +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -0,0 +1,116 @@ +use super::EnabledCollector; +use linkerd_app_core::control::ControlAddr; +use linkerd_app_core::proxy::http::HttpBody; +use linkerd_app_core::Error; +use linkerd_opentelemetry::{ + self as opentelemetry, metrics, + proto::proto::common::v1::{any_value, AnyValue, KeyValue}, + proto::transform::common::ResourceAttributesWithSchema, +}; +use std::time::UNIX_EPOCH; +use std::{collections::HashMap, time::SystemTime}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::body::BoxBody; +use tonic::client::GrpcService; +use tracing::Instrument; + +const SPAN_BUFFER_CAPACITY: usize = 100; +const SERVICE_NAME: &str = "linkerd-proxy"; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let mut resources = ResourceAttributesWithSchema::default(); + + resources + .attributes + .0 + .push(SERVICE_NAME.with_key("service.name")); + resources + .attributes + .0 + .push((std::process::id() as i64).with_key("process.pid")); + + resources.attributes.0.push( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) + .with_key("process.start_timestamp"), + ); + resources + .attributes + .0 + .push(hostname.unwrap_or_default().with_key("host.name")); + + resources.attributes.0.extend( + attributes + .into_iter() + .map(|(key, value)| value.with_key(&key)), + ); + + let addr = addr.clone(); + let task = Box::pin( + opentelemetry::export_spans(svc, spans_rx, resources, legacy_metrics) + .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), + ); + + EnabledCollector { + addr, + task, + span_sink: linkerd_app_core::http_tracing::SpanSink::Otel(span_sink), + } +} + +trait IntoAnyValue +where + Self: Sized, +{ + fn into_any_value(self) -> AnyValue; + + fn with_key(self, key: &str) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(self.into_any_value()), + } + } +} + +impl IntoAnyValue for String { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self)), + } + } +} + +impl IntoAnyValue for &str { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self.to_string())), + } + } +} + +impl IntoAnyValue for i64 { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::IntValue(self)), + } + } +} diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 2f983cade7..cb254fb216 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -7,12 +7,14 @@ use futures::stream::{Stream, StreamExt}; use http_body::Body as HttpBody; use linkerd_error::Error; use metrics::Registry; +pub use opentelemetry as otel; pub use opentelemetry_proto as proto; use opentelemetry_proto::proto::collector::trace::v1::trace_service_client::TraceServiceClient; use opentelemetry_proto::proto::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::proto::trace::v1::ResourceSpans; use opentelemetry_proto::transform::common::ResourceAttributesWithSchema; use opentelemetry_proto::transform::trace::group_spans_by_resource_and_scope; +pub use opentelemetry_sdk as sdk; pub use opentelemetry_sdk::export::trace::SpanData; use tokio::{sync::mpsc, time}; use tonic::{self as grpc, body::BoxBody, client::GrpcService}; diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index 61c54102df..f198d135e2 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -108,14 +108,11 @@ fn main() { ), } - if let Some(oc) = app.opencensus_addr() { - match oc.identity.value() { - None => info!("OpenCensus tracing collector at {}", oc.addr), + if let Some(tracing) = app.tracing_addr() { + match tracing.identity.value() { + None => info!("Tracing collector at {}", tracing.addr), Some(tls) => { - info!( - "OpenCensus tracing collector at {} ({})", - oc.addr, tls.server_id - ) + info!("Tracing collector at {} ({})", tracing.addr, tls.server_id) } } } diff --git a/tools/src/bin/gen-protos.rs b/tools/src/bin/gen-protos.rs index 4e85f3bffe..4668797bd5 100644 --- a/tools/src/bin/gen-protos.rs +++ b/tools/src/bin/gen-protos.rs @@ -1,18 +1,17 @@ fn main() { let opencensus_dir = { let manifest_dir = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")); - manifest_dir.parent().unwrap().join("opencensus-proto") + manifest_dir.parent().unwrap().join("opentelemetry-proto") }; let out_dir = opencensus_dir.join("src").join("gen"); let iface_files = { - let proto_dir = opencensus_dir.join("opencensus").join("proto"); + let proto_dir = opencensus_dir.join("opentelemetry").join("proto"); &[ - proto_dir.join("agent/common/v1/common.proto"), - proto_dir.join("agent/trace/v1/trace_service.proto"), + proto_dir.join("collector/trace/v1/trace_service.proto"), + proto_dir.join("common/v1/common.proto"), proto_dir.join("resource/v1/resource.proto"), - proto_dir.join("trace/v1/trace_config.proto"), proto_dir.join("trace/v1/trace.proto"), ] };