Skip to content

Commit

Permalink
[span processor] Add force_flush method. (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp and frigus02 authored Nov 11, 2020
1 parent 9c4f310 commit 68f041a
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 6 deletions.
59 changes: 53 additions & 6 deletions opentelemetry/src/sdk/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// already set). This method is called synchronously within the `Span::end`
/// API, therefore it should not block or throw an exception.
fn on_end(&self, span: SpanData);
/// Force the spans lying in the cache to be exported.
fn force_flush(&self);
/// Shuts down the processor. Called when SDK is shut down. This is an
/// opportunity for processor to do any cleanup required.
/// opportunity for processors to do any cleanup required.
fn shutdown(&mut self);
}

Expand Down Expand Up @@ -123,6 +125,10 @@ impl SpanProcessor for SimpleSpanProcessor {
}
}

fn force_flush(&self) {
// Ignored since all spans in Simple Processor will be exported as they ended.
}

fn shutdown(&mut self) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Expand Down Expand Up @@ -196,6 +202,12 @@ impl SpanProcessor for BatchSpanProcessor {
}
}

fn force_flush(&self) {
if let Ok(mut sender) = self.message_sender.lock() {
let _ = sender.try_send(BatchMessage::Flush);
}
}

fn shutdown(&mut self) {
if let Ok(mut sender) = self.message_sender.lock() {
// Send shutdown message to worker future
Expand All @@ -212,7 +224,7 @@ impl SpanProcessor for BatchSpanProcessor {
#[derive(Debug)]
enum BatchMessage {
ExportSpan(SpanData),
Tick,
Flush,
Shutdown,
}

Expand All @@ -230,7 +242,7 @@ impl BatchSpanProcessor {
IS: Stream<Item = ISI> + Send + 'static,
{
let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick);
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush);

// Spawn worker process via user-defined spawn function.
let worker_handle = spawn(Box::pin(async move {
Expand All @@ -245,8 +257,8 @@ impl BatchSpanProcessor {
spans.push(span);
}
}
// Span batch interval time reached, export current spans.
BatchMessage::Tick => {
// Span batch interval time reached or a force flush has been invoked, export current spans.
BatchMessage::Flush => {
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
Expand Down Expand Up @@ -450,8 +462,12 @@ mod tests {
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
};
use crate::exporter::trace::stdout;
use crate::testing::trace::{new_test_export_span_data, new_test_exporter};
use crate::sdk::trace::BatchConfig;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
};
use std::time;
use tokio::time::Duration;

#[test]
fn simple_span_processor_on_end_calls_export() {
Expand Down Expand Up @@ -500,4 +516,35 @@ mod tests {
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
}

#[tokio::test]
async fn test_batch_span_processor() {
let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
let mut config = BatchConfig::default();
config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush
let processor = BatchSpanProcessor::new(
Box::new(exporter),
tokio::spawn,
tokio::time::interval,
config,
);
let handle = tokio::spawn(async move {
loop {
if let Some(span) = export_receiver.recv().await {
assert_eq!(span.span_context, new_test_export_span_data().span_context);
break;
}
}
});
tokio::time::delay_for(Duration::from_secs(1)).await; // skip the first
processor.on_end(new_test_export_span_data());
processor.force_flush();

assert!(
tokio::time::timeout(Duration::from_secs(5), handle)
.await
.is_ok(),
"timed out in 5 seconds. force_flush may not export any data when called"
);
}
}
35 changes: 35 additions & 0 deletions opentelemetry/src/testing/trace.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::exporter::trace::SpanData;
use crate::{
exporter::trace::{self as exporter, ExportResult, SpanExporter},
sdk::{
Expand Down Expand Up @@ -82,3 +83,37 @@ pub fn new_test_exporter() -> (TestSpanExporter, Receiver<exporter::SpanData>, R
};
(exporter, rx_export, rx_shutdown)
}

#[derive(Debug)]
pub struct TokioSpanExporter {
tx_export: tokio::sync::mpsc::UnboundedSender<exporter::SpanData>,
tx_shutdown: tokio::sync::mpsc::UnboundedSender<()>,
}

#[async_trait]
impl SpanExporter for TokioSpanExporter {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
for span_data in batch {
self.tx_export.send(span_data)?;
}
Ok(())
}

fn shutdown(&mut self) {
self.tx_shutdown.send(()).unwrap();
}
}

pub fn new_tokio_test_exporter() -> (
TokioSpanExporter,
tokio::sync::mpsc::UnboundedReceiver<exporter::SpanData>,
tokio::sync::mpsc::UnboundedReceiver<()>,
) {
let (tx_export, rx_export) = tokio::sync::mpsc::unbounded_channel();
let (tx_shutdown, rx_shutdown) = tokio::sync::mpsc::unbounded_channel();
let exporter = TokioSpanExporter {
tx_export,
tx_shutdown,
};
(exporter, rx_export, rx_shutdown)
}

0 comments on commit 68f041a

Please sign in to comment.