Skip to content

Commit

Permalink
Garbage collection of uncaptured traces after 30 minutes
Browse files Browse the repository at this point in the history
This commit cleans up traces that were not captured after 30 minutes from starting, it does it by spawning a tokio task that waits until the timeout and then proceeds to the cleanup, warning in case it was necessary.

Fixes https://github.com/prisma/client-planning/issues/186
  • Loading branch information
miguelff committed Dec 18, 2022
1 parent a5bd7fc commit 0f4ba10
Showing 1 changed file with 57 additions and 6 deletions.
63 changes: 57 additions & 6 deletions query-engine/query-engine/src/capture_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ pub struct ConfiguredCapturer {

impl ConfiguredCapturer {
pub async fn start_capturing(&self) {
self.capturer.start_capturing(self.trace_id).await
self.capturer
.start_capturing(self.trace_id, CaptureTimeout::Default)
.await
}

pub async fn fetch_captures(&self) -> Vec<UserFacingSpan> {
Expand Down Expand Up @@ -93,22 +95,45 @@ impl PipelineBuilder {
/// later retrieval
#[derive(Debug, Clone)]
pub struct CaptureExporter {
traces: Arc<Mutex<HashMap<TraceId, Vec<SpanData>>>>,
pub(crate) traces: Arc<Mutex<HashMap<TraceId, Vec<SpanData>>>>,
}

pub(crate) enum CaptureTimeout {
#[allow(dead_code)]
Duration(Duration),
Default,
}

impl CaptureExporter {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800);

pub fn new() -> Self {
Self {
traces: Default::default(),
}
}

pub async fn start_capturing(&self, trace_id: TraceId) {
let mut traces = self.traces.lock().await;
traces.insert(trace_id, Vec::new());
pub(crate) async fn start_capturing(&self, trace_id: TraceId, timeout: CaptureTimeout) {
let mut locked_traces = self.traces.lock().await;
locked_traces.insert(trace_id, Vec::new());
drop(locked_traces);

let when = match timeout {
CaptureTimeout::Duration(d) => d,
CaptureTimeout::Default => Self::DEFAULT_TIMEOUT,
};

let traces = self.traces.clone();
tokio::spawn(async move {
tokio::time::sleep(when).await;
let mut locked_traces = traces.lock().await;
if locked_traces.remove(&trace_id).is_some() {
warn!("Timeout waiting for spans to be captured. trace_id{}", trace_id)
}
});
}

pub async fn fetch_captures(&self, trace_id: TraceId) -> Vec<UserFacingSpan> {
pub(crate) async fn fetch_captures(&self, trace_id: TraceId) -> Vec<UserFacingSpan> {
let mut traces = self.traces.lock().await;

let spans = if let Some(spans) = traces.remove(&trace_id) {
Expand Down Expand Up @@ -143,3 +168,29 @@ impl SpanExporter for CaptureExporter {
Ok(())
}
}

// tests for capture exporter
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[tokio::test]
async fn test_garbage_collection() {
let exporter = CaptureExporter::new();

let trace_id = TraceId::from_hex("1").unwrap();
let one_ms = Duration::from_millis(1);
exporter
.start_capturing(trace_id, CaptureTimeout::Duration(one_ms))
.await;
let traces = exporter.traces.lock().await;
assert!(traces.get(&trace_id).is_some());
drop(traces);

tokio::time::sleep(10 * one_ms).await;

let traces = exporter.traces.lock().await;
assert!(traces.get(&trace_id).is_none());
}
}

0 comments on commit 0f4ba10

Please sign in to comment.