diff --git a/linkerd/opentelemetry/Cargo.toml b/linkerd/opentelemetry/Cargo.toml new file mode 100644 index 0000000000..78c0ae7f72 --- /dev/null +++ b/linkerd/opentelemetry/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "linkerd-opentelemetry" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +futures = { version = "0.3", default-features = false } +http = "0.2" +http-body = "0.4" +linkerd-error = { path = "../error" } +linkerd-metrics = { path = "../metrics" } +opentelemetry = { version = "0.23", default-features = false, features = ["trace"] } +opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace"] } +opentelemetry-proto = { path = "../../opentelemetry-proto" } +tonic = { version = "0.10", default-features = false, features = [ + "prost", + "codegen", +] } +tokio = { version = "1", features = ["macros", "sync", "time"] } +tokio-stream = { version = "0.1", features = ["sync"] } +tracing = "0.1" diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs new file mode 100644 index 0000000000..8d931207c9 --- /dev/null +++ b/linkerd/opentelemetry/src/lib.rs @@ -0,0 +1,212 @@ +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +pub mod metrics; + +use futures::stream::{Stream, StreamExt}; +use http_body::Body as HttpBody; +use linkerd_error::Error; +use metrics::Registry; +pub use opentelemetry_proto as proto; +use opentelemetry_proto::proto::collector::trace::v1::trace_service_client::TraceServiceClient; +use opentelemetry_proto::proto::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::proto::trace::v1::ResourceSpans; +use opentelemetry_proto::transform::common::ResourceAttributesWithSchema; +use opentelemetry_proto::transform::trace::group_spans_by_resource_and_scope; +pub use opentelemetry_sdk::export::trace::SpanData; +use tokio::{sync::mpsc, time}; +use tonic::{self as grpc, body::BoxBody, client::GrpcService}; +use tracing::{debug, trace}; + +pub async fn export_spans( + client: T, + spans: S, + resource: ResourceAttributesWithSchema, + metrics: Registry, +) where + T: GrpcService + Clone, + T::Error: Into, + T::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, + S: Stream + Unpin, +{ + debug!("Span exporter running"); + SpanExporter::new(client, spans, resource, metrics) + .run() + .await +} + +/// SpanExporter sends a Stream of spans to the given TraceService gRPC service. +struct SpanExporter { + client: T, + spans: S, + resource: ResourceAttributesWithSchema, + metrics: Registry, +} + +#[derive(Debug)] +struct SpanRxClosed; + +// === impl SpanExporter === + +impl SpanExporter +where + T: GrpcService + Clone, + T::Error: Into, + T::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, + S: Stream + Unpin, +{ + const MAX_BATCH_SIZE: usize = 1000; + const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10); + + fn new(client: T, spans: S, resource: ResourceAttributesWithSchema, metrics: Registry) -> Self { + Self { + client, + spans, + resource, + metrics, + } + } + + async fn run(self) { + let Self { + client, + mut spans, + resource, + mut metrics, + } = self; + + // Holds the batch of pending spans. Cleared as the spans are flushed. + // Contains no more than MAX_BATCH_SIZE spans. + let mut accum = Vec::new(); + + let mut svc = TraceServiceClient::new(client); + loop { + trace!("Establishing new TraceService::export request"); + metrics.start_stream(); + let (tx, mut rx) = mpsc::channel(1); + + let recv_future = async { + while let Some(req) = rx.recv().await { + match svc.export(grpc::Request::new(req)).await { + Ok(rsp) => { + let Some(partial_success) = rsp.into_inner().partial_success else { + continue; + }; + + if !partial_success.error_message.is_empty() { + debug!( + %partial_success.error_message, + rejected_spans = partial_success.rejected_spans, + "Response partially successful", + ); + } + } + Err(error) => { + debug!(%error, "Response future failed; restarting"); + } + } + } + }; + + // Drive both the response future and the export stream + // simultaneously. + tokio::select! { + _ = recv_future => {} + res = Self::export(&tx, &mut spans, &resource, &mut accum) => match res { + // The export stream closed; reconnect. + Ok(()) => {}, + // No more spans. + Err(SpanRxClosed) => return, + }, + } + } + } + + /// Accumulate spans and send them on the export stream. + /// + /// Returns an error when the proxy has closed the span stream. + async fn export( + tx: &mpsc::Sender, + spans: &mut S, + resource: &ResourceAttributesWithSchema, + accum: &mut Vec, + ) -> Result<(), SpanRxClosed> { + loop { + // Collect spans into a batch. + let collect = Self::collect_batch(spans, resource, accum).await; + + // If we collected spans, flush them. + if !accum.is_empty() { + // Once a batch has been accumulated, ensure that the + // request stream is ready to accept the batch. + match tx.reserve().await { + Ok(tx) => { + let msg = ExportTraceServiceRequest { + resource_spans: std::mem::take(accum), + }; + trace!(spans = msg.resource_spans.len(), "Sending batch"); + tx.send(msg); + } + Err(error) => { + // If the channel isn't open, start a new stream + // and retry sending the batch. + debug!(%error, "Request stream lost; restarting"); + return Ok(()); + } + } + } + + // If the span source was closed, end the task. + if let Err(closed) = collect { + debug!("Span channel lost"); + return Err(closed); + } + } + } + + /// Collects spans from the proxy into `accum`. + /// + /// Returns an error when the span sream has completed. An error may be + /// returned after accumulating spans. + async fn collect_batch( + span_stream: &mut S, + resource: &ResourceAttributesWithSchema, + output_accum: &mut Vec, + ) -> Result<(), SpanRxClosed> { + let mut input_accum = vec![]; + + let res = loop { + if input_accum.len() == Self::MAX_BATCH_SIZE { + trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached"); + break Ok(()); + } + + tokio::select! { + biased; + + res = span_stream.next() => match res { + Some(span) => { + trace!(?span, "Adding to batch"); + input_accum.push(span); + } + None => break Err(SpanRxClosed), + }, + + // Don't hold spans indefinitely. Return if we hit an idle + // timeout and spans have been collected. + _ = time::sleep(Self::MAX_BATCH_IDLE) => { + if !input_accum.is_empty() { + trace!(spans = input_accum.len(), "Flushing spans due to inactivitiy"); + break Ok(()); + } + } + } + }; + + *output_accum = group_spans_by_resource_and_scope(input_accum, resource); + + res + } +} diff --git a/linkerd/opentelemetry/src/metrics.rs b/linkerd/opentelemetry/src/metrics.rs new file mode 100644 index 0000000000..5c785de9d4 --- /dev/null +++ b/linkerd/opentelemetry/src/metrics.rs @@ -0,0 +1,58 @@ +use linkerd_metrics::{metrics, Counter, FmtMetrics}; +use std::fmt; +use std::sync::Arc; + +metrics! { + opentelemetry_span_export_streams: Counter { "Total count of opened span export streams" }, + opentelemetry_span_export_requests: Counter { "Total count of span export request messages" }, + opentelemetry_span_exports: Counter { "Total count of spans exported" } +} + +#[derive(Debug)] +struct Metrics { + streams: Counter, + requests: Counter, + spans: Counter, +} + +#[derive(Clone, Debug)] +pub struct Registry(Arc); + +#[derive(Clone, Debug)] +pub struct Report(Arc); + +pub fn new() -> (Registry, Report) { + let metrics = Metrics { + streams: Counter::default(), + requests: Counter::default(), + spans: Counter::default(), + }; + let shared = Arc::new(metrics); + (Registry(shared.clone()), Report(shared)) +} + +impl Registry { + pub fn start_stream(&mut self) { + self.0.streams.incr() + } + + pub fn send(&mut self, spans: u64) { + self.0.requests.incr(); + self.0.spans.add(spans); + } +} + +impl FmtMetrics for Report { + fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + opentelemetry_span_export_streams.fmt_help(f)?; + opentelemetry_span_export_streams.fmt_metric(f, &self.0.streams)?; + + opentelemetry_span_export_requests.fmt_help(f)?; + opentelemetry_span_export_requests.fmt_metric(f, &self.0.requests)?; + + opentelemetry_span_exports.fmt_help(f)?; + opentelemetry_span_exports.fmt_metric(f, &self.0.spans)?; + + Ok(()) + } +}