Skip to content

Commit

Permalink
Garbage collection of uncaptured traces after 30 minutes
Browse files Browse the repository at this point in the history
This commit cleans up traces that were not captured after 30 minutes from starting, it does it by spawning a tokio task that waits until the timeout and then proceeds to the cleanup, warning in case it was necessary.

Fixes https://github.com/prisma/client-planning/issues/186
  • Loading branch information
miguelff committed Dec 18, 2022
1 parent a5bd7fc commit 488c03a
Showing 1 changed file with 58 additions and 7 deletions.
65 changes: 58 additions & 7 deletions query-engine/query-engine/src/capture_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ pub struct ConfiguredCapturer {

impl ConfiguredCapturer {
pub async fn start_capturing(&self) {
self.capturer.start_capturing(self.trace_id).await
self.capturer
.start_capturing(self.trace_id, CaptureTimeout::Default)
.await
}

pub async fn fetch_captures(&self) -> Vec<UserFacingSpan> {
Expand Down Expand Up @@ -93,22 +95,45 @@ impl PipelineBuilder {
/// later retrieval
#[derive(Debug, Clone)]
pub struct CaptureExporter {
traces: Arc<Mutex<HashMap<TraceId, Vec<SpanData>>>>,
pub(crate) traces: Arc<Mutex<HashMap<TraceId, Vec<SpanData>>>>,
}

pub(crate) enum CaptureTimeout {
#[allow(dead_code)]
Duration(Duration),
Default,
}

impl CaptureExporter {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800);

pub fn new() -> Self {
Self {
traces: Default::default(),
}
}

pub async fn start_capturing(&self, trace_id: TraceId) {
let mut traces = self.traces.lock().await;
traces.insert(trace_id, Vec::new());
pub(crate) async fn start_capturing(&self, trace_id: TraceId, timeout: CaptureTimeout) {
let mut locked_traces = self.traces.lock().await;
locked_traces.insert(trace_id, Vec::new());
drop(locked_traces);

let when = match timeout {
CaptureTimeout::Duration(d) => d,
CaptureTimeout::Default => Self::DEFAULT_TIMEOUT,
};

let traces = self.traces.clone();
tokio::spawn(async move {
tokio::time::sleep(when).await;
let mut locked_traces = traces.lock().await;
if let Some(_) = locked_traces.remove(&trace_id) {
warn!("Timeout waiting for spans to be captured. trace_id={}", trace_id)
}
});
}

pub async fn fetch_captures(&self, trace_id: TraceId) -> Vec<UserFacingSpan> {
pub(crate) async fn fetch_captures(&self, trace_id: TraceId) -> Vec<UserFacingSpan> {
let mut traces = self.traces.lock().await;

let spans = if let Some(spans) = traces.remove(&trace_id) {
Expand All @@ -135,11 +160,37 @@ impl SpanExporter for CaptureExporter {
for span in batch {
let trace_id = span.span_context.trace_id();

if let Some(spans) = traces.get_mut(&trace_id) {
if traces.get_mut(&trace_id).is_some() {
spans.push(span)
}
}

Ok(())
}
}

// tests for capture exporter
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[tokio::test]
async fn test_garbage_collection() {
let exporter = CaptureExporter::new();

let trace_id = TraceId::from_hex("1").unwrap();
let one_ms = Duration::from_millis(1);
exporter
.start_capturing(trace_id, CaptureTimeout::Duration(one_ms))
.await;
let traces = exporter.traces.lock().await;
assert!(traces.get(&trace_id).is_some());
drop(traces);

tokio::time::sleep(10 * one_ms).await;

let traces = exporter.traces.lock().await;
assert!(traces.get(&trace_id).is_none());
}
}

0 comments on commit 488c03a

Please sign in to comment.