Skip to content

Commit

Permalink
Simplify capturer design
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Jan 27, 2023
1 parent e3007c0 commit fe5e840
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 232 deletions.
251 changes: 20 additions & 231 deletions query-engine/core/src/telemetry/capturing/capturer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use opentelemetry::{
},
trace::{TraceId, TraceResult},
};
use std::{borrow::Cow, fmt};
use std::fmt;
use std::{collections::HashMap, sync::Arc, sync::Mutex};

/// Capturer determines, based on a set of settings and a trace id, how capturing is going to be handled.
Expand Down Expand Up @@ -124,14 +124,12 @@ impl SpanProcessor for Processor {
let mut locked_storage = self.storage.lock().unwrap();
if let Some(storage) = locked_storage.get_mut(&trace_id) {
let settings = storage.settings.clone();
let original_span_name = span_data.name.clone();

let (events, span) = models::TraceSpan::from(span_data).split_events();

let candidate_span = Candidate {
value: span,
settings: &settings,
original_span_name: Some(original_span_name),
};

let capture: Capture = candidate_span.into();
Expand All @@ -142,7 +140,6 @@ impl SpanProcessor for Processor {
let candidate_event = Candidate {
value: log,
settings: &settings,
original_span_name: None,
};
let capture: Capture = candidate_event.into();
capture.add_to(&mut storage.traces, &mut storage.logs);
Expand All @@ -161,95 +158,44 @@ impl SpanProcessor for Processor {
Ok(())
}
}

const VALID_QUERY_ATTRS: [&str; 3] = ["query", "params", "duration_ms"];
/// A Candidate represents either a span or an event that is being considered for capturing.
/// A Candidate can be converted into a [`Capture`].
#[derive(Debug, Clone)]
struct Candidate<'batch_iter, T: Clone + fmt::Debug> {
value: T,
settings: &'batch_iter Settings,
original_span_name: Option<Cow<'batch_iter, str>>,
}

impl Candidate<'_, models::TraceSpan> {
#[inline(always)]
fn is_loggable_quaint_query(&self) -> bool {
self.settings.included_log_levels.contains("query")
&& self.original_span_name.is_some()
&& matches!(self.original_span_name, Some(Cow::Borrowed("quaint:query")))
}

fn query_event(&self) -> models::Event {
let span = &self.value;

let duration_ms = ((span.end_time[0] as f64 - span.start_time[0] as f64) * 1_000.0)
+ ((span.end_time[1] as f64 - span.start_time[1] as f64) / 1_000_000.0);

let statement = if let Some(q) = span.attributes.get("db.statement") {
match q {
serde_json::Value::String(s) => s.to_string(),
_ => "unknown".to_string(),
}
} else {
"unknown".to_string()
};

let attributes = vec![(
"duration_ms".to_owned(),
serde_json::Value::Number(serde_json::Number::from_f64(duration_ms).unwrap()),
)]
.into_iter()
.collect();

models::Event {
span_id: Some(span.span_id.to_owned()),
name: statement,
level: "query".to_string(),
timestamp: span.start_time,
attributes,
}
}
}

impl Candidate<'_, models::LogEvent> {
#[inline(always)]
fn is_loggable_mongo_db_query(&self) -> bool {
self.settings.included_log_levels.contains("query") && {
fn is_loggable_query_event(&self) -> bool {
if self.settings.included_log_levels.contains("query") {
if let Some(target) = self.value.attributes.get("target") {
if let Some(val) = target.as_str() {
return val == "mongodb_query_connector::query";
return (val == "quaint::connector::metrics" && self.value.attributes.get("query").is_some())
|| val == "mongodb_query_connector::query";
}
}
false
}
false
}

#[inline(always)]
fn is_loggable_event(&self) -> bool {
self.settings.included_log_levels.contains(&self.value.level)
}

fn query_event(self) -> models::LogEvent {
let mut attributes = self.value.attributes;
let mut attrs = HashMap::new();
if let Some(dur) = attributes.get("duration_ms") {
attrs.insert("duration_ms".to_owned(), dur.clone());
}

let mut name = "uknown".to_owned();
if let Some(query) = attributes.remove("query") {
if let Some(str) = query.as_str() {
name = str.to_owned();
}
}
fn query_event(mut self) -> models::LogEvent {
self.value
.attributes
.retain(|key, _| (&VALID_QUERY_ATTRS).contains(&key.as_str()));

models::LogEvent {
name,
level: "query".to_string(),
attributes: attrs,
..self.value
}
}

#[inline(always)]
fn is_loggable_event(&self) -> bool {
self.settings.included_log_levels.contains(&self.value.level)
}
}

/// Capture provides mechanisms to transform a candidate into one of the enum variants.
Expand All @@ -259,7 +205,6 @@ impl Candidate<'_, models::LogEvent> {
enum Capture {
Span(models::TraceSpan),
LogEvent(models::LogEvent),
Multiple(Vec<Capture>),
Discarded,
}

Expand All @@ -274,11 +219,6 @@ impl Capture {
Capture::LogEvent(log) => {
logs.push(log);
}
Capture::Multiple(captures) => {
for capture in captures {
capture.add_to(traces, logs);
}
}
Capture::Discarded => {}
}
}
Expand All @@ -288,8 +228,7 @@ impl Capture {
/// be captrured as-is if its log level is among the levels to capture, or be discarded.
impl From<Candidate<'_, models::Event>> for Capture {
fn from(candidate: Candidate<'_, models::Event>) -> Capture {
if candidate.is_loggable_mongo_db_query() {
// mongo events representing queries are transformed into a more meaningful log event
if candidate.is_loggable_query_event() {
Capture::LogEvent(candidate.query_event())
} else if candidate.is_loggable_event() {
Capture::LogEvent(candidate.value)
Expand All @@ -303,20 +242,10 @@ impl From<Candidate<'_, models::Event>> for Capture {
/// is enabled; captured as-is, if tracing is enabled; or be discarded.
impl From<Candidate<'_, models::TraceSpan>> for Capture {
fn from(candidate: Candidate<'_, models::TraceSpan>) -> Capture {
let mut captures = vec![];

if candidate.is_loggable_quaint_query() {
captures.push(Capture::LogEvent(candidate.query_event()));
}

if candidate.settings.traces_enabled() {
captures.push(Capture::Span(candidate.value));
}

match captures.len() {
0 => Capture::Discarded,
1 => captures.pop().unwrap(),
_ => Capture::Multiple(captures),
Capture::Span(candidate.value)
} else {
Capture::Discarded
}
}
}
Expand All @@ -327,146 +256,6 @@ mod tests {
use super::*;
use std::time::Duration;

#[test]
fn test_candidate_event_transformation() {
let event = models::LogEvent {
span_id: Some("00f067aa0ba902b7".to_owned()),
name: "foo bar".to_owned(),
level: "debug".to_owned(),
timestamp: [101, 0],
attributes: vec![
(
"target".to_owned(),
serde_json::Value::String("mongodb_query_connector::query".to_owned()),
),
(
"query".to_owned(),
serde_json::Value::String("db.Users.find()".to_owned()),
),
("duration_ms".to_owned(), serde_json::json!(100.0)),
]
.into_iter()
.collect(),
};

let only_query_log_events: Settings = "query".into();

let candidate = Candidate {
value: event.clone(),
settings: &only_query_log_events,
original_span_name: None,
};

let capture: Capture = candidate.into();
match capture {
Capture::LogEvent(event) => {
assert_eq!(event.level, "query");
assert_eq!(event.name.to_string().as_str(), "db.Users.find()");
assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "100.0");
}
_ => unreachable!(),
};

let event = models::LogEvent {
attributes: vec![(
"target".to_owned(),
serde_json::Value::String("a different one".to_owned()),
)]
.into_iter()
.collect(),
..event
};
let candidate = Candidate {
value: event.clone(),
settings: &only_query_log_events,
original_span_name: None,
};

let capture: Capture = candidate.into();
match capture {
Capture::Discarded => {}
_ => unreachable!(),
}
}

#[test]
fn test_candidate_span_transformation() {
let trace_span = models::TraceSpan {
trace_id: "4bf92f3577b34da6a3ce929d0e0e4736".to_owned(),
span_id: "00f067aa0ba902b7".to_owned(),
parent_span_id: "00f067aa0ba902b5".to_owned(),
name: "prisma:engine:db_query".to_ascii_lowercase(),
start_time: [101, 0],
end_time: [101, 10000000],
attributes: vec![(
"db.statement".to_owned(),
serde_json::Value::String("SELECT 1".to_owned()),
)]
.into_iter()
.collect(),
events: Default::default(),
links: Default::default(),
};

// capturing query events
let only_query_log_events: Settings = "query".into();
let original_span_name = Some(Cow::Borrowed("quaint:query"));

let candidate = Candidate {
value: trace_span.clone(),
settings: &only_query_log_events,
original_span_name: original_span_name.clone(),
};

let capture: Capture = candidate.into();
match capture {
Capture::LogEvent(event) => {
assert_eq!(event.level, "query");
assert_eq!(event.name.to_string().as_str(), "SELECT 1");
assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "10.0");
}
_ => unreachable!(),
};

// capturing query and tracing events
let query_logs_and_traces: Settings = "query, tracing".into();
let candidate = Candidate {
value: trace_span.clone(),
settings: &query_logs_and_traces,
original_span_name: original_span_name.clone(),
};

let capture: Capture = candidate.into();
match capture {
Capture::Multiple(captures) => {
match captures[0] {
Capture::LogEvent(_) => {}
_ => unreachable!(),
};

match captures[1] {
Capture::Span(_) => {}
_ => unreachable!(),
};
}
_ => unreachable!(),
};

// capturing nothing
let reject_all: Settings = "".into();
let candidate = Candidate {
value: trace_span.clone(),
settings: &reject_all,
original_span_name: original_span_name.clone(),
};

let capture: Capture = candidate.into();
match capture {
Capture::Discarded => {}
_ => unreachable!(),
};
}

#[tokio::test]
async fn test_garbage_collection() {
let exporter = Processor::new();
Expand Down
1 change: 0 additions & 1 deletion query-engine/core/src/telemetry/capturing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ pub fn install_capturing_layer(
let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "opentelemetry");
// set the provider as the global provider
global::set_tracer_provider(provider);

// create a layer that will filter initial events and spans based on the log level configuration
// from the environment and a specific filter to discard things that we are not interested in
// from a capturiong perspective
Expand Down
1 change: 1 addition & 0 deletions query-engine/core/src/telemetry/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl From<SpanData> for TraceSpan {
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Event {
pub(super) span_id: Option<String>,
#[serde(skip_serializing_if = "String::is_empty")]
pub(super) name: String,
pub(super) level: String,
pub(super) timestamp: HrTime,
Expand Down

0 comments on commit fe5e840

Please sign in to comment.