diff --git a/query-engine/query-engine/src/capture_exporter.rs b/query-engine/query-engine/src/capture_exporter.rs index 7346de0036f4..a62940379379 100644 --- a/query-engine/query-engine/src/capture_exporter.rs +++ b/query-engine/query-engine/src/capture_exporter.rs @@ -35,7 +35,9 @@ pub struct ConfiguredCapturer { impl ConfiguredCapturer { pub async fn start_capturing(&self) { - self.capturer.start_capturing(self.trace_id).await + self.capturer + .start_capturing(self.trace_id, CaptureTimeout::Default) + .await } pub async fn fetch_captures(&self) -> Vec { @@ -93,22 +95,45 @@ impl PipelineBuilder { /// later retrieval #[derive(Debug, Clone)] pub struct CaptureExporter { - traces: Arc>>>, + pub(crate) traces: Arc>>>, +} + +pub(crate) enum CaptureTimeout { + #[allow(dead_code)] + Duration(Duration), + Default, } impl CaptureExporter { + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); + pub fn new() -> Self { Self { traces: Default::default(), } } - pub async fn start_capturing(&self, trace_id: TraceId) { - let mut traces = self.traces.lock().await; - traces.insert(trace_id, Vec::new()); + pub(crate) async fn start_capturing(&self, trace_id: TraceId, timeout: CaptureTimeout) { + let mut locked_traces = self.traces.lock().await; + locked_traces.insert(trace_id, Vec::new()); + drop(locked_traces); + + let when = match timeout { + CaptureTimeout::Duration(d) => d, + CaptureTimeout::Default => Self::DEFAULT_TIMEOUT, + }; + + let traces = self.traces.clone(); + tokio::spawn(async move { + tokio::time::sleep(when).await; + let mut locked_traces = traces.lock().await; + if let Some(_) = locked_traces.remove(&trace_id) { + warn!("Timeout waiting for spans to be captured. trace_id={}", trace_id) + } + }); } - pub async fn fetch_captures(&self, trace_id: TraceId) -> Vec { + pub(crate) async fn fetch_captures(&self, trace_id: TraceId) -> Vec { let mut traces = self.traces.lock().await; let spans = if let Some(spans) = traces.remove(&trace_id) { @@ -135,7 +160,7 @@ impl SpanExporter for CaptureExporter { for span in batch { let trace_id = span.span_context.trace_id(); - if let Some(spans) = traces.get_mut(&trace_id) { + if traces.get_mut(&trace_id).is_some() { spans.push(span) } } @@ -143,3 +168,29 @@ impl SpanExporter for CaptureExporter { Ok(()) } } + +// tests for capture exporter +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn test_garbage_collection() { + let exporter = CaptureExporter::new(); + + let trace_id = TraceId::from_hex("1").unwrap(); + let one_ms = Duration::from_millis(1); + exporter + .start_capturing(trace_id, CaptureTimeout::Duration(one_ms)) + .await; + let traces = exporter.traces.lock().await; + assert!(traces.get(&trace_id).is_some()); + drop(traces); + + tokio::time::sleep(10 * one_ms).await; + + let traces = exporter.traces.lock().await; + assert!(traces.get(&trace_id).is_none()); + } +}