Skip to content

Commit

Permalink
feat: add force_push_tracer_provider function. (#512)
Browse files Browse the repository at this point in the history
This helps users to force push all remaining spans within all span processors
  • Loading branch information
TommyCpp authored Apr 6, 2021
1 parent f7db050 commit 76b6ec9
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 4 deletions.
5 changes: 3 additions & 2 deletions opentelemetry/src/global/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ pub use propagation::{get_text_map_propagator, set_text_map_propagator};
#[cfg(feature = "trace")]
#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
pub use trace::{
set_tracer_provider, shutdown_tracer_provider, tracer, tracer_provider, tracer_with_version,
BoxedSpan, BoxedTracer, GenericTracer, GenericTracerProvider, GlobalTracerProvider,
force_flush_tracer_provider, set_tracer_provider, shutdown_tracer_provider, tracer,
tracer_provider, tracer_with_version, BoxedSpan, BoxedTracer, GenericTracer,
GenericTracerProvider, GlobalTracerProvider,
};
37 changes: 36 additions & 1 deletion opentelemetry/src/global/trace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::trace::NoopTracerProvider;
use crate::global::handle_error;
use crate::trace::{NoopTracerProvider, TraceResult};
use crate::{trace, trace::TracerProvider, Context, KeyValue};
use std::borrow::Cow;
use std::fmt;
Expand Down Expand Up @@ -168,6 +169,9 @@ pub trait GenericTracerProvider: fmt::Debug + 'static {
name: &'static str,
version: Option<&'static str>,
) -> Box<dyn GenericTracer + Send + Sync>;

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>>;
}

impl<S, T, P> GenericTracerProvider for P
Expand All @@ -184,6 +188,10 @@ where
) -> Box<dyn GenericTracer + Send + Sync> {
Box::new(self.get_tracer(name, version))
}

fn force_flush(&self) -> Vec<TraceResult<()>> {
self.force_flush()
}
}

/// Represents the globally configured [`TracerProvider`] instance for this
Expand Down Expand Up @@ -217,6 +225,11 @@ impl trace::TracerProvider for GlobalTracerProvider {
fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer {
BoxedTracer(self.provider.get_tracer_boxed(name, version))
}

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>> {
self.provider.force_flush()
}
}

lazy_static::lazy_static! {
Expand Down Expand Up @@ -291,6 +304,24 @@ pub fn shutdown_tracer_provider() {
);
}

/// Force flush all remaining spans in span processors.
///
/// Use the [`global::handle_error`] to handle errors happened during force flush.
///
/// [`global::handle_error`]: crate::global::handle_error
pub fn force_flush_tracer_provider() {
let tracer_provider = GLOBAL_TRACER_PROVIDER
.write()
.expect("GLOBAL_TRACER_PROVIDER RwLock poisoned");

let results = trace::TracerProvider::force_flush(&*tracer_provider);
for result in results {
if let Err(err) = result {
handle_error(err)
}
}
}

#[cfg(test)]
// Note that all tests here should be marked as ignore so that it won't be picked up by default We
// need to run those tests one by one as the GlobalTracerProvider is a shared object between
Expand Down Expand Up @@ -378,6 +409,10 @@ mod tests {
fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer {
NoopTracer::default()
}

fn force_flush(&self) -> Vec<TraceResult<()>> {
Vec::new()
}
}

#[test]
Expand Down
60 changes: 60 additions & 0 deletions opentelemetry/src/sdk/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! propagators) are provided by the `TracerProvider`. `Tracer` instances do
//! not duplicate this data to avoid that different `Tracer` instances
//! of the `TracerProvider` have different versions of these data.
use crate::trace::TraceResult;
use crate::{
global,
runtime::Runtime,
Expand Down Expand Up @@ -85,6 +86,14 @@ impl crate::trace::TracerProvider for TracerProvider {

sdk::trace::Tracer::new(instrumentation_lib, Arc::downgrade(&self.inner))
}

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>> {
self.span_processors()
.iter()
.map(|processor| processor.force_flush())
.collect()
}
}

/// Builder for provider attributes.
Expand Down Expand Up @@ -146,3 +155,54 @@ impl Builder {
}
}
}

#[cfg(test)]
mod tests {
use crate::sdk::export::trace::SpanData;
use crate::sdk::trace::provider::TracerProviderInner;
use crate::sdk::trace::{Span, SpanProcessor};
use crate::trace::{TraceError, TraceResult, TracerProvider};
use crate::Context;
use std::sync::Arc;

#[derive(Debug)]
struct TestSpanProcessor {
success: bool,
}

impl SpanProcessor for TestSpanProcessor {
fn on_start(&self, _span: &Span, _cx: &Context) {
unimplemented!()
}

fn on_end(&self, _span: SpanData) {
unimplemented!()
}

fn force_flush(&self) -> TraceResult<()> {
if self.success {
Ok(())
} else {
Err(TraceError::from("cannot export"))
}
}

fn shutdown(&mut self) -> TraceResult<()> {
self.force_flush()
}
}

#[test]
fn test_force_flush() {
let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner {
processors: vec![
Box::from(TestSpanProcessor { success: true }),
Box::from(TestSpanProcessor { success: false }),
],
config: Default::default(),
}));

let results = tracer_provider.force_flush();
assert_eq!(results.len(), 2);
}
}
6 changes: 6 additions & 0 deletions opentelemetry/src/trace/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! This implementation is returned as the global tracer if no `Tracer`
//! has been set. It is also useful for testing purposes as it is intended
//! to have minimal resource utilization and runtime impact.
use crate::trace::TraceResult;
use crate::{
sdk::export::trace::{ExportResult, SpanData, SpanExporter},
trace,
Expand Down Expand Up @@ -32,6 +33,11 @@ impl trace::TracerProvider for NoopTracerProvider {
fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer {
NoopTracer::new()
}

/// Return an empty `Vec` as there isn't any span processors in `NoopTracerProvider`
fn force_flush(&self) -> Vec<TraceResult<()>> {
Vec::new()
}
}

/// A no-op instance of a `Span`.
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//!
//! Implementations might require the user to specify configuration properties at
//! `TracerProvider` creation time, or rely on external configurations.
use crate::trace::Tracer;
use crate::trace::{TraceResult, Tracer};
use std::fmt;

/// An interface to create `Tracer` instances.
Expand All @@ -30,4 +30,7 @@ pub trait TracerProvider: fmt::Debug + 'static {
/// Creates a named tracer instance of `Self::Tracer`.
/// If the name is an empty string then provider uses default name.
fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer;

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>>;
}

0 comments on commit 76b6ec9

Please sign in to comment.