diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 1723b263e9..edf6de73c3 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -92,45 +92,64 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { fn shutdown(&mut self) -> TraceResult<()>; } -/// A [`SpanProcessor`] that exports synchronously when spans are finished. -/// -/// # Examples -/// -/// Note that the simple processor exports synchronously every time a span is -/// ended. If you find this limiting, consider the batch processor instead. +/// A [`SpanProcessor`] that exports when spans are finished. #[derive(Debug)] pub struct SimpleSpanProcessor { - sender: crossbeam_channel::Sender>, - shutdown: crossbeam_channel::Receiver<()>, + message_sender: crossbeam_channel::Sender, } impl SimpleSpanProcessor { pub(crate) fn new(mut exporter: Box) -> Self { - let (span_tx, span_rx) = crossbeam_channel::unbounded(); - let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0); + let (message_sender, rx) = crossbeam_channel::unbounded(); let _ = thread::Builder::new() .name("opentelemetry-exporter".to_string()) .spawn(move || { - while let Ok(Some(span)) = span_rx.recv() { - if let Err(err) = futures_executor::block_on(exporter.export(vec![span])) { - global::handle_error(err); + while let Ok(msg) = rx.recv() { + match msg { + Message::ExportSpan(span) => { + if let Err(err) = + futures_executor::block_on(exporter.export(vec![span])) + { + global::handle_error(err); + } + } + Message::Flush(sender) => { + Self::respond(&sender, "sync"); + } + Message::Shutdown(sender) => { + exporter.shutdown(); + + Self::respond(&sender, "shutdown"); + + return; + } } } exporter.shutdown(); - - if let Err(err) = shutdown_tx.send(()) { - global::handle_error(TraceError::from(format!( - "could not send shutdown: {:?}", - err - ))); - } }); - SimpleSpanProcessor { - sender: span_tx, - shutdown: shutdown_rx, + Self { message_sender } + } + + fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) { + let (tx, rx) = crossbeam_channel::bounded(0); + + if self.message_sender.send(msg(tx)).is_ok() { + if let Err(err) = rx.recv() { + global::handle_error(TraceError::from(format!( + "error {description} span processor: {err:?}" + ))); + } + } + } + + fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) { + if let Err(err) = sender.send(()) { + global::handle_error(TraceError::from(format!( + "could not send {description}: {err:?}" + ))); } } } @@ -145,30 +164,31 @@ impl SpanProcessor for SimpleSpanProcessor { return; } - if let Err(err) = self.sender.send(Some(span)) { + if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) { global::handle_error(TraceError::from(format!("error processing span {:?}", err))); } } fn force_flush(&self) -> TraceResult<()> { - // Ignored since all spans in Simple Processor will be exported as they ended. + self.signal(Message::Flush, "flushing"); + Ok(()) } fn shutdown(&mut self) -> TraceResult<()> { - if self.sender.send(None).is_ok() { - if let Err(err) = self.shutdown.recv() { - global::handle_error(TraceError::from(format!( - "error shutting down span processor: {:?}", - err - ))) - } - } + self.signal(Message::Shutdown, "shutting down"); Ok(()) } } +#[derive(Debug)] +enum Message { + ExportSpan(SpanData), + Flush(crossbeam_channel::Sender<()>), + Shutdown(crossbeam_channel::Sender<()>), +} + /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. ///