diff --git a/Cargo.lock b/Cargo.lock index bce8c03238..d62896aca2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1126,6 +1126,7 @@ dependencies = [ "linkerd-app-outbound", "linkerd-error", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-tonic-stream", "rangemap", "regex", @@ -1184,6 +1185,7 @@ dependencies = [ "linkerd-meshtls", "linkerd-metrics", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-proxy-api-resolve", "linkerd-proxy-balance", "linkerd-proxy-client-policy", diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index af443530a3..1420c6edfe 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" } linkerd-app-outbound = { path = "./outbound" } linkerd-error = { path = "../error" } linkerd-opencensus = { path = "../opencensus" } +linkerd-opentelemetry = { path = "../opentelemetry" } linkerd-tonic-stream = { path = "../tonic-stream" } rangemap = "1" regex = "1" diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 43e19a2816..fff82289f7 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" } linkerd-meshtls = { path = "../../meshtls", default-features = false } linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] } linkerd-opencensus = { path = "../../opencensus" } +linkerd-opentelemetry = { path = "../../opentelemetry" } linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" } linkerd-proxy-balance = { path = "../../proxy/balance" } linkerd-proxy-core = { path = "../../proxy/core" } diff --git a/linkerd/app/core/src/http_tracing.rs b/linkerd/app/core/src/http_tracing.rs index 6b24c599e3..b1be24b409 100644 --- a/linkerd/app/core/src/http_tracing.rs +++ b/linkerd/app/core/src/http_tracing.rs @@ -1,139 +1,88 @@ +use crate::http_tracing::opencensus::{OcSpanConverter, OpenCensusSink}; +use crate::http_tracing::opentelemetry::{OpenTelemetrySink, OtelSpanConverter}; use linkerd_error::Error; -use linkerd_opencensus::proto::trace::v1 as oc; use linkerd_stack::layer; -use linkerd_trace_context::{self as trace_context, TraceContext}; +use linkerd_trace_context as trace_context; +use linkerd_trace_context::{Span, TraceContext}; use std::{collections::HashMap, sync::Arc}; use thiserror::Error; -use tokio::sync::mpsc; -pub type OpenCensusSink = Option>; +mod opencensus; +mod opentelemetry; + pub type Labels = Arc>; -/// SpanConverter converts trace_context::Span objects into OpenCensus agent -/// protobuf span objects. SpanConverter receives trace_context::Span objects by -/// implmenting the SpanSink trait. For each span that it receives, it converts -/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. -#[derive(Clone)] -pub struct SpanConverter { - kind: Kind, - sink: mpsc::Sender, - labels: Labels, +#[derive(Clone, Debug)] +pub enum SpanSink { + Oc(OpenCensusSink), + Otel(OpenTelemetrySink), } -#[derive(Debug, Error)] -#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] -pub struct IdLengthError { - id: Vec, - expected_size: usize, - actual_size: usize, +impl SpanSink { + fn into_converter(self, kind: Kind, labels: impl Into) -> SpanConverter { + match self { + SpanSink::Oc(oc) => SpanConverter::Oc(OcSpanConverter::from_sink(oc, kind, labels)), + SpanSink::Otel(otel) => { + SpanConverter::Otel(OtelSpanConverter::from_sink(otel, kind, labels)) + } + } + } } pub fn server( - sink: OpenCensusSink, + sink: Option, labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Server, sink, labels) + TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Server, labels))) } pub fn client( - sink: OpenCensusSink, + sink: Option, labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Client, sink, labels) + TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Client, labels))) } -#[derive(Copy, Clone, Debug, PartialEq)] -enum Kind { - Server = 1, - Client = 2, -} - -impl SpanConverter { - fn layer( - kind: Kind, - sink: OpenCensusSink, - labels: impl Into, - ) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| Self { - kind, - sink, - labels: labels.into(), - })) - } - - fn mk_span(&self, mut span: trace_context::Span) -> Result { - let mut attributes = HashMap::::new(); - for (k, v) in self.labels.iter() { - attributes.insert( - k.clone(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable( - v.clone(), - ))), - }, - ); - } - for (k, v) in span.labels.drain() { - attributes.insert( - k.to_string(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), - }, - ); - } - Ok(oc::Span { - trace_id: into_bytes(span.trace_id, 16)?, - span_id: into_bytes(span.span_id, 8)?, - tracestate: None, - parent_span_id: into_bytes(span.parent_id, 8)?, - name: Some(truncatable(span.span_name)), - kind: self.kind as i32, - start_time: Some(span.start.into()), - end_time: Some(span.end.into()), - attributes: Some(oc::span::Attributes { - attribute_map: attributes, - dropped_attributes_count: 0, - }), - stack_trace: None, - time_events: None, - links: None, - status: None, // TODO: this is gRPC status; we must read response trailers to populate this - resource: None, - same_process_as_parent_span: Some(self.kind == Kind::Client), - child_span_count: None, - }) - } +#[derive(Clone)] +pub enum SpanConverter { + Oc(OcSpanConverter), + Otel(OtelSpanConverter), } impl trace_context::SpanSink for SpanConverter { - #[inline] fn is_enabled(&self) -> bool { true } - fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { - let span = self.mk_span(span)?; - self.sink.try_send(span).map_err(Into::into) + fn try_send(&mut self, span: Span) -> Result<(), Error> { + match self { + SpanConverter::Oc(oc) => oc.try_send(span), + SpanConverter::Otel(otel) => otel.try_send(span), + } } } -fn into_bytes(id: trace_context::Id, size: usize) -> Result, IdLengthError> { - let bytes: Vec = id.into(); - if bytes.len() == size { - Ok(bytes) - } else { - let actual_size = bytes.len(); - Err(IdLengthError { - id: bytes, - expected_size: size, - actual_size, - }) - } +#[derive(Debug, Error)] +#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] +pub struct IdLengthError { + id: Vec, + expected_size: usize, + actual_size: usize, } -fn truncatable(value: String) -> oc::TruncatableString { - oc::TruncatableString { - value, - truncated_byte_count: 0, - } +#[derive(Copy, Clone, Debug, PartialEq)] +enum Kind { + Server = 1, + Client = 2, +} + +fn into_bytes(id: trace_context::Id) -> Result<[u8; N], IdLengthError> { + id.as_ref().try_into().map_err(|_| { + let bytes: Vec = id.into(); + IdLengthError { + expected_size: N, + actual_size: bytes.len(), + id: bytes, + } + }) } diff --git a/linkerd/app/core/src/http_tracing/opencensus.rs b/linkerd/app/core/src/http_tracing/opencensus.rs new file mode 100644 index 0000000000..7bca45902e --- /dev/null +++ b/linkerd/app/core/src/http_tracing/opencensus.rs @@ -0,0 +1,91 @@ +use crate::http_tracing::{into_bytes, IdLengthError, Kind, Labels}; +use linkerd_error::Error; +use linkerd_opencensus::proto::trace::v1 as oc; +use linkerd_trace_context as trace_context; +use std::collections::HashMap; +use tokio::sync::mpsc; + +pub type OpenCensusSink = mpsc::Sender; + +/// SpanConverter converts trace_context::Span objects into OpenCensus agent +/// protobuf span objects. SpanConverter receives trace_context::Span objects by +/// implmenting the SpanSink trait. For each span that it receives, it converts +/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. +#[derive(Clone)] +pub struct OcSpanConverter { + kind: Kind, + sink: mpsc::Sender, + labels: Labels, +} + +impl OcSpanConverter { + pub(super) fn from_sink(sink: OpenCensusSink, kind: Kind, labels: impl Into) -> Self { + Self { + kind, + sink, + labels: labels.into(), + } + } + + fn mk_span(&self, mut span: trace_context::Span) -> Result { + let mut attributes = HashMap::::new(); + for (k, v) in self.labels.iter() { + attributes.insert( + k.clone(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable( + v.clone(), + ))), + }, + ); + } + for (k, v) in span.labels.drain() { + attributes.insert( + k.to_string(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), + }, + ); + } + Ok(oc::Span { + trace_id: into_bytes::<16>(span.trace_id)?.to_vec(), + span_id: into_bytes::<8>(span.span_id)?.to_vec(), + tracestate: None, + parent_span_id: into_bytes::<8>(span.parent_id)?.to_vec(), + name: Some(truncatable(span.span_name)), + kind: self.kind as i32, + start_time: Some(span.start.into()), + end_time: Some(span.end.into()), + attributes: Some(oc::span::Attributes { + attribute_map: attributes, + dropped_attributes_count: 0, + }), + stack_trace: None, + time_events: None, + links: None, + status: None, // TODO: this is gRPC status; we must read response trailers to populate this + resource: None, + same_process_as_parent_span: Some(self.kind == Kind::Client), + child_span_count: None, + }) + } +} + +impl trace_context::SpanSink for OcSpanConverter { + #[inline] + fn is_enabled(&self) -> bool { + true + } + + fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { + let span = self.mk_span(span)?; + self.sink.try_send(span).map_err(Into::into) + } +} + +fn truncatable(value: String) -> oc::TruncatableString { + oc::TruncatableString { + value, + truncated_byte_count: 0, + } +} diff --git a/linkerd/app/core/src/http_tracing/opentelemetry.rs b/linkerd/app/core/src/http_tracing/opentelemetry.rs new file mode 100644 index 0000000000..8fcecd2ea1 --- /dev/null +++ b/linkerd/app/core/src/http_tracing/opentelemetry.rs @@ -0,0 +1,85 @@ +use crate::http_tracing::{into_bytes, IdLengthError, Kind, Labels}; +use linkerd_error::Error; +use linkerd_opentelemetry::otel::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, +}; +use linkerd_opentelemetry::otel::KeyValue; +use linkerd_opentelemetry::sdk::{export::trace::SpanData, trace::SpanLinks, Resource}; +use linkerd_trace_context as trace_context; +use std::borrow::Cow; +use tokio::sync::mpsc; + +pub type OpenTelemetrySink = mpsc::Sender; + +/// SpanConverter converts trace_context::Span objects into OpenTelemetry agent +/// protobuf span objects. SpanConverter receives trace_context::Span objects by +/// implmenting the SpanSink trait. For each span that it receives, it converts +/// it to an OpenTelemetry span and then sends it on the provided mpsc::Sender. +#[derive(Clone)] +pub struct OtelSpanConverter { + kind: Kind, + sink: OpenTelemetrySink, + labels: Labels, + resource: Cow<'static, Resource>, +} + +impl OtelSpanConverter { + pub(super) fn from_sink( + sink: OpenTelemetrySink, + kind: Kind, + labels: impl Into, + ) -> Self { + Self { + kind, + sink, + labels: labels.into(), + resource: Cow::Owned(Resource::default()), + } + } + + fn mk_span(&self, mut span: trace_context::Span) -> Result { + let mut attributes = Vec::::new(); + for (k, v) in self.labels.iter() { + attributes.push(KeyValue::new(k.clone(), v.clone())); + } + for (k, v) in span.labels.drain() { + attributes.push(KeyValue::new(k, v.clone())); + } + Ok(SpanData { + parent_span_id: SpanId::from_bytes(into_bytes(span.parent_id)?), + span_kind: match self.kind { + Kind::Server => SpanKind::Server, + Kind::Client => SpanKind::Client, + }, + name: span.span_name.into(), + start_time: span.start, + end_time: span.end, + attributes, + dropped_attributes_count: 0, + links: SpanLinks::default(), + status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this + resource: self.resource.clone(), + span_context: SpanContext::new( + TraceId::from_bytes(into_bytes(span.trace_id)?), + SpanId::from_bytes(into_bytes(span.span_id)?), + TraceFlags::default(), + /* is_remote= */ self.kind != Kind::Client, + TraceState::NONE, + ), + events: Default::default(), + instrumentation_lib: Default::default(), + }) + } +} + +impl trace_context::SpanSink for OtelSpanConverter { + #[inline] + fn is_enabled(&self) -> bool { + true + } + + fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { + let span = self.mk_span(span)?; + self.sink.try_send(span).map_err(Into::into) + } +} diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index fc98a3b621..8d3a433791 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics; pub use linkerd_idle_cache as idle_cache; pub use linkerd_io as io; pub use linkerd_opencensus as opencensus; +pub use linkerd_opentelemetry as opentelemetry; pub use linkerd_service_profiles as profiles; pub use linkerd_stack_metrics as stack_metrics; pub use linkerd_stack_tracing as stack_tracing; @@ -65,7 +66,7 @@ pub struct ProxyRuntime { pub identity: identity::creds::Receiver, pub metrics: metrics::Proxy, pub tap: proxy::tap::Registry, - pub span_sink: http_tracing::OpenCensusSink, + pub span_sink: Option, pub drain: drain::Watch, } diff --git a/linkerd/app/core/src/metrics.rs b/linkerd/app/core/src/metrics.rs index 3fd374031b..084403e365 100644 --- a/linkerd/app/core/src/metrics.rs +++ b/linkerd/app/core/src/metrics.rs @@ -9,7 +9,7 @@ pub use crate::transport::labels::{TargetAddr, TlsAccept}; use crate::{ classify::Class, - control, http_metrics, opencensus, profiles, stack_metrics, + control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics, svc::Param, tls, transport::{self, labels::TlsConnect}, @@ -39,6 +39,7 @@ pub struct Metrics { pub proxy: Proxy, pub control: ControlHttp, pub opencensus: opencensus::metrics::Registry, + pub opentelemetry: opentelemetry::metrics::Registry, } #[derive(Clone, Debug)] @@ -191,11 +192,13 @@ impl Metrics { }; let (opencensus, opencensus_report) = opencensus::metrics::new(); + let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new(); let metrics = Metrics { proxy, control, opencensus, + opentelemetry, }; let report = endpoint_report @@ -205,6 +208,7 @@ impl Metrics { .and_report(control_report) .and_report(transport_report) .and_report(opencensus_report) + .and_report(opentelemetry_report) .and_report(stack); (metrics, report) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 88ec18afb8..5074979d0c 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -18,12 +18,13 @@ mod server; #[cfg(any(test, feature = "test-util", fuzzing))] pub mod test_util; +#[cfg(fuzzing)] +pub use self::http::fuzz as http_fuzz; pub use self::{metrics::InboundMetrics, policy::DefaultPolicy}; +use linkerd_app_core::http_tracing::SpanSink; use linkerd_app_core::{ config::{ConnectConfig, ProxyConfig, QueueConfig}, - drain, - http_tracing::OpenCensusSink, - identity, io, + drain, identity, io, proxy::{tap, tcp}, svc, transport::{self, Remote, ServerAddr}, @@ -33,9 +34,6 @@ use std::{fmt::Debug, time::Duration}; use thiserror::Error; use tracing::debug_span; -#[cfg(fuzzing)] -pub use self::http::fuzz as http_fuzz; - #[derive(Clone, Debug)] pub struct Config { pub allow_discovery: NameMatch, @@ -67,7 +65,7 @@ struct Runtime { metrics: InboundMetrics, identity: identity::creds::Receiver, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index dc8f77cb1c..0fb34ded58 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -47,7 +47,7 @@ impl Outbound> { .check_new_service::>() .push(ServerRescue::layer(config.emit_headers)) .check_new_service::>() - // Initiates OpenCensus tracing. + // Initiates OpenTelemetry tracing. .push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels())) .push_on_service(http::BoxResponse::layer()) // Convert origin form HTTP/1 URIs to absolute form for Hyper's diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 75ee6c70d5..2cc8fccecc 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -6,11 +6,11 @@ #![allow(opaque_hidden_inferred_bound)] #![forbid(unsafe_code)] +use linkerd_app_core::http_tracing::SpanSink; use linkerd_app_core::{ config::{ProxyConfig, QueueConfig}, drain, exp_backoff::ExponentialBackoff, - http_tracing::OpenCensusSink, identity, io, metrics::prom, profiles, @@ -95,7 +95,7 @@ struct Runtime { metrics: OutboundMetrics, identity: identity::NewClient, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 49e00a88ba..710b572ce4 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -1,4 +1,5 @@ -use crate::{dns, gateway, identity, inbound, oc_collector, outbound, policy, spire}; +use crate::trace_collector::CollectorKind; +use crate::{dns, gateway, identity, inbound, outbound, policy, spire, trace_collector}; use linkerd_app_core::{ addr, config::*, @@ -19,7 +20,7 @@ use tracing::{debug, error, info, warn}; mod control; mod http2; -mod opencensus; +mod trace; mod types; use self::types::*; @@ -146,6 +147,7 @@ const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str = "LINKERD2_PROXY_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS"; pub const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; +pub const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL"; /// Constrains which destination names may be used for profile/route discovery. /// @@ -428,7 +430,8 @@ pub fn parse_config(strings: &S) -> Result let hostname = strings.get(ENV_HOSTNAME); - let oc_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_protocol = strings.get(ENV_TRACE_PROTOCOL); let trace_collector_addr = parse_control_addr(strings, ENV_TRACE_COLLECTOR_SVC_BASE); @@ -813,8 +816,8 @@ pub fn parse_config(strings: &S) -> Result .into(), }; - let oc_collector = match trace_collector_addr? { - None => oc_collector::Config::Disabled, + let trace_collector = match trace_collector_addr? { + None => trace_collector::Config::Disabled, Some(addr) => { let connect = if addr.addr.is_loopback() { inbound.proxy.connect.clone() @@ -826,14 +829,20 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.http_request_queue.failfast_timeout }; - let attributes = oc_attributes_file_path + let attributes = trace_attributes_file_path .map(|path| match path.and_then(|p| p.parse::().ok()) { - Some(path) => opencensus::read_trace_attributes(&path), + Some(path) => trace::read_trace_attributes(&path), None => HashMap::new(), }) .unwrap_or_default(); - oc_collector::Config::Enabled(Box::new(oc_collector::EnabledConfig { + let trace_protocol = trace_protocol + .map(|proto| proto.and_then(|p| p.parse::().ok())) + .ok() + .flatten() + .unwrap_or_default(); + + trace_collector::Config::Enabled(Box::new(trace_collector::EnabledConfig { attributes, hostname: hostname?, control: ControlConfig { @@ -844,6 +853,7 @@ pub fn parse_config(strings: &S) -> Result failfast_timeout, }, }, + kind: trace_protocol, })) } }; @@ -923,7 +933,7 @@ pub fn parse_config(strings: &S) -> Result dns, dst, tap, - oc_collector, + trace_collector, policy, identity, outbound, diff --git a/linkerd/app/src/env/opencensus.rs b/linkerd/app/src/env/trace.rs similarity index 100% rename from linkerd/app/src/env/opencensus.rs rename to linkerd/app/src/env/trace.rs diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index aa73893851..70e6639e86 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -7,10 +7,10 @@ pub mod dst; pub mod env; pub mod identity; -pub mod oc_collector; pub mod policy; pub mod spire; pub mod tap; +pub mod trace_collector; pub use self::metrics::Metrics; use futures::{future, Future, FutureExt}; @@ -60,7 +60,7 @@ pub struct Config { pub policy: policy::Config, pub admin: admin::Config, pub tap: tap::Config, - pub oc_collector: oc_collector::Config, + pub trace_collector: trace_collector::Config, /// Grace period for graceful shutdowns. /// @@ -75,7 +75,7 @@ pub struct App { dst: ControlAddr, identity: identity::Identity, inbound_addr: Local, - oc_collector: oc_collector::OcCollector, + trace_collector: trace_collector::TraceCollector, outbound_addr: Local, outbound_addr_additional: Option>, start_proxy: Pin + Send + 'static>>, @@ -123,7 +123,7 @@ impl Config { policy, identity, inbound, - oc_collector, + trace_collector, outbound, gateway, tap, @@ -188,16 +188,25 @@ impl Config { }) }?; - debug!(config = ?oc_collector, "Building client"); - let oc_collector = { - let control_metrics = - ControlMetrics::register(registry.sub_registry_with_prefix("opencensus")); + debug!(config = ?trace_collector, "Building client"); + let trace_collector = { + let control_metrics = ControlMetrics::register( + registry.sub_registry_with_prefix(trace_collector.metrics_prefix()), + ); let identity = identity.receiver().new_client(); let dns = dns.resolver; let client_metrics = metrics.control.clone(); - let metrics = metrics.opencensus; - info_span!("opencensus").in_scope(|| { - oc_collector.build(identity, dns, metrics, control_metrics, client_metrics) + let otel_metrics = metrics.opentelemetry; + let oc_metrics = metrics.opencensus; + trace_collector.info_span().in_scope(|| { + trace_collector.build( + identity, + dns, + oc_metrics, + otel_metrics, + control_metrics, + client_metrics, + ) }) }?; @@ -205,7 +214,7 @@ impl Config { identity: identity.receiver(), metrics: metrics.proxy, tap: tap.registry(), - span_sink: oc_collector.span_sink(), + span_sink: trace_collector.span_sink(), drain: drain_rx.clone(), }; let inbound = Inbound::new(inbound, runtime.clone()); @@ -309,7 +318,7 @@ impl Config { drain: drain_tx, identity, inbound_addr, - oc_collector, + trace_collector, outbound_addr, outbound_addr_additional, start_proxy, @@ -369,10 +378,10 @@ impl App { self.identity.receiver().local_id().clone() } - pub fn opencensus_addr(&self) -> Option<&ControlAddr> { - match self.oc_collector { - oc_collector::OcCollector::Disabled { .. } => None, - oc_collector::OcCollector::Enabled(ref oc) => Some(&oc.addr), + pub fn tracing_addr(&self) -> Option<&ControlAddr> { + match self.trace_collector { + trace_collector::TraceCollector::Disabled { .. } => None, + crate::trace_collector::TraceCollector::Enabled(ref oc) => Some(&oc.addr), } } @@ -381,7 +390,7 @@ impl App { admin, drain, identity, - oc_collector, + trace_collector: collector, start_proxy, tap, .. @@ -446,8 +455,9 @@ impl App { tokio::spawn(serve.instrument(info_span!("tap").or_current())); } - if let oc_collector::OcCollector::Enabled(oc) = oc_collector { - tokio::spawn(oc.task.instrument(info_span!("opencensus").or_current())); + if let trace_collector::TraceCollector::Enabled(collector) = collector { + let span = collector.info_span().or_current(); + tokio::spawn(collector.task.instrument(span)); } // we don't care if the admin shutdown channel is diff --git a/linkerd/app/src/oc_collector.rs b/linkerd/app/src/oc_collector.rs deleted file mode 100644 index e8f25e23c5..0000000000 --- a/linkerd/app/src/oc_collector.rs +++ /dev/null @@ -1,103 +0,0 @@ -use linkerd_app_core::{ - control, dns, identity, metrics::ControlHttp as HttpMetrics, svc::NewService, Error, -}; -use linkerd_opencensus::{self as opencensus, metrics, proto}; -use std::{collections::HashMap, future::Future, pin::Pin, time::SystemTime}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tracing::Instrument; - -#[derive(Clone, Debug)] -pub enum Config { - Disabled, - Enabled(Box), -} - -#[derive(Clone, Debug)] -pub struct EnabledConfig { - pub control: control::Config, - pub attributes: HashMap, - pub hostname: Option, -} - -pub type Task = Pin + Send + 'static>>; - -pub type SpanSink = mpsc::Sender; - -pub enum OcCollector { - Disabled, - Enabled(Box), -} - -pub struct EnabledCollector { - pub addr: control::ControlAddr, - pub span_sink: SpanSink, - pub task: Task, -} - -impl Config { - const SPAN_BUFFER_CAPACITY: usize = 100; - const SERVICE_NAME: &'static str = "linkerd-proxy"; - - pub fn build( - self, - identity: identity::NewClient, - dns: dns::Resolver, - legacy_metrics: metrics::Registry, - control_metrics: control::Metrics, - client_metrics: HttpMetrics, - ) -> Result { - match self { - Config::Disabled => Ok(OcCollector::Disabled), - Config::Enabled(inner) => { - let addr = inner.control.addr.clone(); - let svc = inner - .control - .build(dns, client_metrics, control_metrics, identity) - .new_service(()); - - let (span_sink, spans_rx) = mpsc::channel(Self::SPAN_BUFFER_CAPACITY); - let spans_rx = ReceiverStream::new(spans_rx); - - let task = { - use self::proto::agent::common::v1 as oc; - - let node = oc::Node { - identifier: Some(oc::ProcessIdentifier { - host_name: inner.hostname.unwrap_or_default(), - pid: std::process::id(), - start_timestamp: Some(SystemTime::now().into()), - }), - service_info: Some(oc::ServiceInfo { - name: Self::SERVICE_NAME.to_string(), - }), - attributes: inner.attributes, - ..oc::Node::default() - }; - - let addr = addr.clone(); - Box::pin( - opencensus::export_spans(svc, node, spans_rx, legacy_metrics).instrument( - tracing::debug_span!("opencensus", peer.addr = %addr).or_current(), - ), - ) - }; - - Ok(OcCollector::Enabled(Box::new(EnabledCollector { - addr, - task, - span_sink, - }))) - } - } - } -} - -impl OcCollector { - pub fn span_sink(&self) -> Option { - match self { - OcCollector::Disabled => None, - OcCollector::Enabled(inner) => Some(inner.span_sink.clone()), - } - } -} diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs new file mode 100644 index 0000000000..d28d12d6a1 --- /dev/null +++ b/linkerd/app/src/trace_collector.rs @@ -0,0 +1,139 @@ +use linkerd_app_core::http_tracing::SpanSink; +use linkerd_app_core::metrics::ControlHttp as HttpMetrics; +use linkerd_app_core::svc::NewService; +use linkerd_app_core::{control, dns, identity, opencensus, opentelemetry}; +use linkerd_error::Error; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::str::FromStr; +use tracing::info_span; + +pub mod oc_collector; +pub mod otel_collector; + +#[derive(Clone, Debug)] +pub enum Config { + Disabled, + Enabled(Box), +} + +#[derive(Debug, Copy, Clone, Default)] +pub enum CollectorKind { + #[default] + OpenCensus, + OpenTelemetry, +} + +impl FromStr for CollectorKind { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "opencensus" => Ok(CollectorKind::OpenCensus), + "opentelemetry" => Ok(CollectorKind::OpenTelemetry), + _ => Err(()), + } + } +} + +#[derive(Clone, Debug)] +pub struct EnabledConfig { + pub control: control::Config, + pub attributes: HashMap, + pub hostname: Option, + pub kind: CollectorKind, +} + +pub type Task = Pin + Send + 'static>>; + +pub enum TraceCollector { + Disabled, + Enabled(Box), +} + +pub struct EnabledCollector { + pub addr: control::ControlAddr, + pub span_sink: SpanSink, + pub task: Task, +} + +impl EnabledCollector { + pub fn info_span(&self) -> tracing::Span { + match self.span_sink { + SpanSink::Oc(_) => info_span!("opencensus"), + SpanSink::Otel(_) => info_span!("opentelemetry"), + } + } +} + +impl TraceCollector { + pub fn span_sink(&self) -> Option { + match self { + TraceCollector::Disabled => None, + TraceCollector::Enabled(inner) => Some(inner.span_sink.clone()), + } + } +} + +impl Config { + pub fn info_span(&self) -> tracing::Span { + match self { + Config::Disabled => info_span!("disabled_tracing"), + Config::Enabled(config) => match config.kind { + CollectorKind::OpenCensus => info_span!("opencensus"), + CollectorKind::OpenTelemetry => info_span!("opentelemetry"), + }, + } + } + + pub fn metrics_prefix(&self) -> &'static str { + match self { + Config::Disabled => "disabled_tracing", + Config::Enabled(config) => match config.kind { + CollectorKind::OpenCensus => "opencensus", + CollectorKind::OpenTelemetry => "opentelemetry", + }, + } + } + + pub fn build( + self, + identity: identity::NewClient, + dns: dns::Resolver, + legacy_oc_metrics: opencensus::metrics::Registry, + legacy_otel_metrics: opentelemetry::metrics::Registry, + control_metrics: control::Metrics, + client_metrics: HttpMetrics, + ) -> Result { + match self { + Config::Disabled => Ok(TraceCollector::Disabled), + Config::Enabled(inner) => { + let addr = inner.control.addr.clone(); + let svc = inner + .control + .build(dns, client_metrics, control_metrics, identity) + .new_service(()); + + let collector = match inner.kind { + CollectorKind::OpenCensus => oc_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_oc_metrics, + ), + CollectorKind::OpenTelemetry => otel_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_otel_metrics, + ), + }; + + Ok(TraceCollector::Enabled(Box::new(collector))) + } + } + } +} diff --git a/linkerd/app/src/trace_collector/oc_collector.rs b/linkerd/app/src/trace_collector/oc_collector.rs new file mode 100644 index 0000000000..89bf7bea6c --- /dev/null +++ b/linkerd/app/src/trace_collector/oc_collector.rs @@ -0,0 +1,62 @@ +use crate::trace_collector::EnabledCollector; +use linkerd_app_core::control::ControlAddr; +use linkerd_app_core::http_tracing::SpanSink; +use linkerd_app_core::proxy::http::HttpBody; +use linkerd_app_core::Error; +use linkerd_opencensus::{self as opencensus, metrics, proto}; +use std::{collections::HashMap, time::SystemTime}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::body::BoxBody; +use tonic::client::GrpcService; +use tracing::Instrument; + +const SPAN_BUFFER_CAPACITY: usize = 100; +const SERVICE_NAME: &str = "linkerd-proxy"; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let task = { + use self::proto::agent::common::v1 as oc; + + let node = oc::Node { + identifier: Some(oc::ProcessIdentifier { + host_name: hostname.unwrap_or_default(), + pid: std::process::id(), + start_timestamp: Some(SystemTime::now().into()), + }), + service_info: Some(oc::ServiceInfo { + name: SERVICE_NAME.to_string(), + }), + attributes, + ..oc::Node::default() + }; + + let addr = addr.clone(); + Box::pin( + opencensus::export_spans(svc, node, spans_rx, legacy_metrics) + .instrument(tracing::debug_span!("opencensus", peer.addr = %addr).or_current()), + ) + }; + + EnabledCollector { + addr, + task, + span_sink: SpanSink::Oc(span_sink), + } +} diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs new file mode 100644 index 0000000000..bc4fd6293d --- /dev/null +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -0,0 +1,116 @@ +use super::EnabledCollector; +use linkerd_app_core::control::ControlAddr; +use linkerd_app_core::proxy::http::HttpBody; +use linkerd_app_core::Error; +use linkerd_opentelemetry::{ + self as opentelemetry, metrics, + proto::proto::common::v1::{any_value, AnyValue, KeyValue}, + proto::transform::common::ResourceAttributesWithSchema, +}; +use std::time::UNIX_EPOCH; +use std::{collections::HashMap, time::SystemTime}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::body::BoxBody; +use tonic::client::GrpcService; +use tracing::Instrument; + +const SPAN_BUFFER_CAPACITY: usize = 100; +const SERVICE_NAME: &str = "linkerd-proxy"; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let mut resources = ResourceAttributesWithSchema::default(); + + resources + .attributes + .0 + .push(SERVICE_NAME.with_key("service.name")); + resources + .attributes + .0 + .push((std::process::id() as i64).with_key("process.pid")); + + resources.attributes.0.push( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) + .with_key("process.start_timestamp"), + ); + resources + .attributes + .0 + .push(hostname.unwrap_or_default().with_key("host.name")); + + resources.attributes.0.extend( + attributes + .into_iter() + .map(|(key, value)| value.with_key(&key)), + ); + + let addr = addr.clone(); + let task = Box::pin( + opentelemetry::export_spans(svc, spans_rx, resources, legacy_metrics) + .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), + ); + + EnabledCollector { + addr, + task, + span_sink: linkerd_app_core::http_tracing::SpanSink::Otel(span_sink), + } +} + +trait IntoAnyValue +where + Self: Sized, +{ + fn into_any_value(self) -> AnyValue; + + fn with_key(self, key: &str) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(self.into_any_value()), + } + } +} + +impl IntoAnyValue for String { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self)), + } + } +} + +impl IntoAnyValue for &str { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self.to_string())), + } + } +} + +impl IntoAnyValue for i64 { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::IntValue(self)), + } + } +} diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 2f983cade7..cb254fb216 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -7,12 +7,14 @@ use futures::stream::{Stream, StreamExt}; use http_body::Body as HttpBody; use linkerd_error::Error; use metrics::Registry; +pub use opentelemetry as otel; pub use opentelemetry_proto as proto; use opentelemetry_proto::proto::collector::trace::v1::trace_service_client::TraceServiceClient; use opentelemetry_proto::proto::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::proto::trace::v1::ResourceSpans; use opentelemetry_proto::transform::common::ResourceAttributesWithSchema; use opentelemetry_proto::transform::trace::group_spans_by_resource_and_scope; +pub use opentelemetry_sdk as sdk; pub use opentelemetry_sdk::export::trace::SpanData; use tokio::{sync::mpsc, time}; use tonic::{self as grpc, body::BoxBody, client::GrpcService}; diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index 61c54102df..f198d135e2 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -108,14 +108,11 @@ fn main() { ), } - if let Some(oc) = app.opencensus_addr() { - match oc.identity.value() { - None => info!("OpenCensus tracing collector at {}", oc.addr), + if let Some(tracing) = app.tracing_addr() { + match tracing.identity.value() { + None => info!("Tracing collector at {}", tracing.addr), Some(tls) => { - info!( - "OpenCensus tracing collector at {} ({})", - oc.addr, tls.server_id - ) + info!("Tracing collector at {} ({})", tracing.addr, tls.server_id) } } } diff --git a/tools/src/bin/gen-protos.rs b/tools/src/bin/gen-protos.rs index 4e85f3bffe..4668797bd5 100644 --- a/tools/src/bin/gen-protos.rs +++ b/tools/src/bin/gen-protos.rs @@ -1,18 +1,17 @@ fn main() { let opencensus_dir = { let manifest_dir = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")); - manifest_dir.parent().unwrap().join("opencensus-proto") + manifest_dir.parent().unwrap().join("opentelemetry-proto") }; let out_dir = opencensus_dir.join("src").join("gen"); let iface_files = { - let proto_dir = opencensus_dir.join("opencensus").join("proto"); + let proto_dir = opencensus_dir.join("opentelemetry").join("proto"); &[ - proto_dir.join("agent/common/v1/common.proto"), - proto_dir.join("agent/trace/v1/trace_service.proto"), + proto_dir.join("collector/trace/v1/trace_service.proto"), + proto_dir.join("common/v1/common.proto"), proto_dir.join("resource/v1/resource.proto"), - proto_dir.join("trace/v1/trace_config.proto"), proto_dir.join("trace/v1/trace.proto"), ] };