From 76b6ec9b7c5aba17a52b0e529d7026222a4e389a Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Tue, 6 Apr 2021 00:54:18 -0400 Subject: [PATCH] feat: add force_push_tracer_provider function. (#512) This helps users to force push all remaining spans within all span processors --- opentelemetry/src/global/mod.rs | 5 ++- opentelemetry/src/global/trace.rs | 37 ++++++++++++++- opentelemetry/src/sdk/trace/provider.rs | 60 +++++++++++++++++++++++++ opentelemetry/src/trace/noop.rs | 6 +++ opentelemetry/src/trace/provider.rs | 5 ++- 5 files changed, 109 insertions(+), 4 deletions(-) diff --git a/opentelemetry/src/global/mod.rs b/opentelemetry/src/global/mod.rs index 3922ee96c3..bacf516397 100644 --- a/opentelemetry/src/global/mod.rs +++ b/opentelemetry/src/global/mod.rs @@ -156,6 +156,7 @@ pub use propagation::{get_text_map_propagator, set_text_map_propagator}; #[cfg(feature = "trace")] #[cfg_attr(docsrs, doc(cfg(feature = "trace")))] pub use trace::{ - set_tracer_provider, shutdown_tracer_provider, tracer, tracer_provider, tracer_with_version, - BoxedSpan, BoxedTracer, GenericTracer, GenericTracerProvider, GlobalTracerProvider, + force_flush_tracer_provider, set_tracer_provider, shutdown_tracer_provider, tracer, + tracer_provider, tracer_with_version, BoxedSpan, BoxedTracer, GenericTracer, + GenericTracerProvider, GlobalTracerProvider, }; diff --git a/opentelemetry/src/global/trace.rs b/opentelemetry/src/global/trace.rs index 98635c7cc9..7f3bae226d 100644 --- a/opentelemetry/src/global/trace.rs +++ b/opentelemetry/src/global/trace.rs @@ -1,4 +1,5 @@ -use crate::trace::NoopTracerProvider; +use crate::global::handle_error; +use crate::trace::{NoopTracerProvider, TraceResult}; use crate::{trace, trace::TracerProvider, Context, KeyValue}; use std::borrow::Cow; use std::fmt; @@ -168,6 +169,9 @@ pub trait GenericTracerProvider: fmt::Debug + 'static { name: &'static str, version: Option<&'static str>, ) -> Box; + + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec>; } impl GenericTracerProvider for P @@ -184,6 +188,10 @@ where ) -> Box { Box::new(self.get_tracer(name, version)) } + + fn force_flush(&self) -> Vec> { + self.force_flush() + } } /// Represents the globally configured [`TracerProvider`] instance for this @@ -217,6 +225,11 @@ impl trace::TracerProvider for GlobalTracerProvider { fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer { BoxedTracer(self.provider.get_tracer_boxed(name, version)) } + + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec> { + self.provider.force_flush() + } } lazy_static::lazy_static! { @@ -291,6 +304,24 @@ pub fn shutdown_tracer_provider() { ); } +/// Force flush all remaining spans in span processors. +/// +/// Use the [`global::handle_error`] to handle errors happened during force flush. +/// +/// [`global::handle_error`]: crate::global::handle_error +pub fn force_flush_tracer_provider() { + let tracer_provider = GLOBAL_TRACER_PROVIDER + .write() + .expect("GLOBAL_TRACER_PROVIDER RwLock poisoned"); + + let results = trace::TracerProvider::force_flush(&*tracer_provider); + for result in results { + if let Err(err) = result { + handle_error(err) + } + } +} + #[cfg(test)] // Note that all tests here should be marked as ignore so that it won't be picked up by default We // need to run those tests one by one as the GlobalTracerProvider is a shared object between @@ -378,6 +409,10 @@ mod tests { fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer { NoopTracer::default() } + + fn force_flush(&self) -> Vec> { + Vec::new() + } } #[test] diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index 60e4eb2bd3..0659cad23f 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -8,6 +8,7 @@ //! propagators) are provided by the `TracerProvider`. `Tracer` instances do //! not duplicate this data to avoid that different `Tracer` instances //! of the `TracerProvider` have different versions of these data. +use crate::trace::TraceResult; use crate::{ global, runtime::Runtime, @@ -85,6 +86,14 @@ impl crate::trace::TracerProvider for TracerProvider { sdk::trace::Tracer::new(instrumentation_lib, Arc::downgrade(&self.inner)) } + + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec> { + self.span_processors() + .iter() + .map(|processor| processor.force_flush()) + .collect() + } } /// Builder for provider attributes. @@ -146,3 +155,54 @@ impl Builder { } } } + +#[cfg(test)] +mod tests { + use crate::sdk::export::trace::SpanData; + use crate::sdk::trace::provider::TracerProviderInner; + use crate::sdk::trace::{Span, SpanProcessor}; + use crate::trace::{TraceError, TraceResult, TracerProvider}; + use crate::Context; + use std::sync::Arc; + + #[derive(Debug)] + struct TestSpanProcessor { + success: bool, + } + + impl SpanProcessor for TestSpanProcessor { + fn on_start(&self, _span: &Span, _cx: &Context) { + unimplemented!() + } + + fn on_end(&self, _span: SpanData) { + unimplemented!() + } + + fn force_flush(&self) -> TraceResult<()> { + if self.success { + Ok(()) + } else { + Err(TraceError::from("cannot export")) + } + } + + fn shutdown(&mut self) -> TraceResult<()> { + self.force_flush() + } + } + + #[test] + fn test_force_flush() { + let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner { + processors: vec![ + Box::from(TestSpanProcessor { success: true }), + Box::from(TestSpanProcessor { success: false }), + ], + config: Default::default(), + })); + + let results = tracer_provider.force_flush(); + assert_eq!(results.len(), 2); + } +} diff --git a/opentelemetry/src/trace/noop.rs b/opentelemetry/src/trace/noop.rs index 560386c7bb..267dcaad0d 100644 --- a/opentelemetry/src/trace/noop.rs +++ b/opentelemetry/src/trace/noop.rs @@ -3,6 +3,7 @@ //! This implementation is returned as the global tracer if no `Tracer` //! has been set. It is also useful for testing purposes as it is intended //! to have minimal resource utilization and runtime impact. +use crate::trace::TraceResult; use crate::{ sdk::export::trace::{ExportResult, SpanData, SpanExporter}, trace, @@ -32,6 +33,11 @@ impl trace::TracerProvider for NoopTracerProvider { fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer { NoopTracer::new() } + + /// Return an empty `Vec` as there isn't any span processors in `NoopTracerProvider` + fn force_flush(&self) -> Vec> { + Vec::new() + } } /// A no-op instance of a `Span`. diff --git a/opentelemetry/src/trace/provider.rs b/opentelemetry/src/trace/provider.rs index 82d5991e29..ef357a5a5e 100644 --- a/opentelemetry/src/trace/provider.rs +++ b/opentelemetry/src/trace/provider.rs @@ -19,7 +19,7 @@ //! //! Implementations might require the user to specify configuration properties at //! `TracerProvider` creation time, or rely on external configurations. -use crate::trace::Tracer; +use crate::trace::{TraceResult, Tracer}; use std::fmt; /// An interface to create `Tracer` instances. @@ -30,4 +30,7 @@ pub trait TracerProvider: fmt::Debug + 'static { /// Creates a named tracer instance of `Self::Tracer`. /// If the name is an empty string then provider uses default name. fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer; + + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec>; }