From 362ac71a5c639cb3fa6dd0940e8af6bcb1d3c331 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Fri, 20 Jan 2023 11:48:59 +0100 Subject: [PATCH 01/12] Implement log/tracing capturing According to https://www.notion.so/prismaio/Data-Proxy-Logs-Scale-Team-Spec-184242eb7271476d82113c59c761bed9#4ec77c2aa9004015ac2d392c7458dfb5 --- Cargo.lock | 1 + Makefile | 2 +- .../query-tests-setup/src/runner/binary.rs | 8 +- query-engine/core/Cargo.toml | 6 +- .../src/interactive_transactions/error.rs | 29 ++ .../core/src/interactive_transactions/mod.rs | 86 ++- query-engine/core/src/lib.rs | 2 +- .../core/src/telemetry/capturing/capturer.rs | 490 ++++++++++++++++++ .../core/src/telemetry/capturing/mod.rs | 160 ++++++ .../core/src/telemetry/capturing/settings.rs | 88 ++++ .../core/src/telemetry/capturing/storage.rs | 19 + query-engine/core/src/telemetry/helpers.rs | 9 + query-engine/core/src/telemetry/mod.rs | 1 + query-engine/query-engine/src/logger.rs | 26 +- query-engine/query-engine/src/opt.rs | 4 + query-engine/query-engine/src/server/mod.rs | 210 +++++++- query-engine/query-engine/src/state.rs | 6 +- query-engine/query-engine/src/tests/dmmf.rs | 1 + .../request-handlers/src/graphql/response.rs | 14 + query-engine/request-handlers/src/lib.rs | 9 + 20 files changed, 1135 insertions(+), 36 deletions(-) create mode 100644 query-engine/core/src/telemetry/capturing/capturer.rs create mode 100644 query-engine/core/src/telemetry/capturing/mod.rs create mode 100644 query-engine/core/src/telemetry/capturing/settings.rs create mode 100644 query-engine/core/src/telemetry/capturing/storage.rs diff --git a/Cargo.lock b/Cargo.lock index d1de3b9ee128..0f66d9d155e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2584,6 +2584,7 @@ dependencies = [ "percent-encoding", "pin-project", "rand 0.8.5", + "serde", "thiserror", "tokio", "tokio-stream", diff --git a/Makefile b/Makefile index c0f170ca940a..2c0ea71694b4 100644 --- a/Makefile +++ b/Makefile @@ -214,7 +214,7 @@ validate: cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma qe: - cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry + cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-telemetry-in-response qe-dmmf: cargo run --bin query-engine -- cli dmmf > dmmf.json diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs index c11f08e5a0b9..1facd8f8527f 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs @@ -20,7 +20,13 @@ pub struct BinaryRunner { #[async_trait::async_trait] impl RunnerInterface for BinaryRunner { async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult { - let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]); + let opts = PrismaOpt::from_list(&[ + "binary", + "--enable-raw-queries", + "--enable-telemetry-in-response", + "--datamodel", + &datamodel, + ]); let state = setup(&opts, false, Some(metrics)).await.unwrap(); let configuration = opts.configuration(true).unwrap(); diff --git a/query-engine/core/Cargo.toml b/query-engine/core/Cargo.toml index 51fccf4103b3..3bd082060f3c 100644 --- a/query-engine/core/Cargo.toml +++ b/query-engine/core/Cargo.toml @@ -22,11 +22,11 @@ futures = "0.3" indexmap = { version = "1.7", features = ["serde-1"] } itertools = "0.10" mongodb-connector = { path = "../connectors/mongodb-query-connector", package = "mongodb-query-connector", optional = true } -once_cell = "1.3" +once_cell = "1" petgraph = "0.4" prisma-models = { path = "../prisma-models", features = ["default_generators"] } prisma-value = { path = "../../libs/prisma-value" } -opentelemetry = { version = "0.17"} +opentelemetry = { version = "0.17.0", features = ["rt-tokio", "serialize"] } query-engine-metrics = {path = "../metrics"} serde.workspace = true serde_json.workspace = true @@ -35,7 +35,7 @@ thiserror = "1.0" tokio.workspace = true tracing = { version = "0.1", features = ["attributes"] } tracing-futures = "0.2" -tracing-subscriber = {version = "0.3", features = ["env-filter"]} +tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-opentelemetry = "0.17.4" url = "2" user-facing-errors = { path = "../../libs/user-facing-errors" } diff --git a/query-engine/core/src/interactive_transactions/error.rs b/query-engine/core/src/interactive_transactions/error.rs index 146d69f103b5..8189e2ce7420 100644 --- a/query-engine/core/src/interactive_transactions/error.rs +++ b/query-engine/core/src/interactive_transactions/error.rs @@ -1,5 +1,10 @@ use thiserror::Error; +use crate::{ + response_ir::{Item, Map}, + CoreError, +}; + #[derive(Debug, Error, PartialEq)] pub enum TransactionError { #[error("Unable to start a transaction in the given time.")] @@ -17,3 +22,27 @@ pub enum TransactionError { #[error("Unexpected response: {reason}.")] Unknown { reason: String }, } + +#[derive(Debug, serde::Serialize, PartialEq)] +pub struct ExtendedTransactionUserFacingError { + #[serde(flatten)] + user_facing_error: user_facing_errors::Error, + + #[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")] + extensions: Map, +} + +impl ExtendedTransactionUserFacingError { + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } +} + +impl From for ExtendedTransactionUserFacingError { + fn from(error: CoreError) -> Self { + ExtendedTransactionUserFacingError { + user_facing_error: error.into(), + extensions: Default::default(), + } + } +} diff --git a/query-engine/core/src/interactive_transactions/mod.rs b/query-engine/core/src/interactive_transactions/mod.rs index def10bfc10ef..bdf11dc1e2b6 100644 --- a/query-engine/core/src/interactive_transactions/mod.rs +++ b/query-engine/core/src/interactive_transactions/mod.rs @@ -1,6 +1,6 @@ use crate::CoreError; use connector::{Connection, ConnectionLike, Transaction}; -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; use tokio::{ task::JoinHandle, time::{Duration, Instant}, @@ -76,6 +76,63 @@ impl Display for TxId { } } +impl From for opentelemetry::Context { + // This is a bit of a hack, but it's the only way to have a default trace span for a whole + // transaction when no traceparent is propagated from the client. + // + // This is done so we can capture traces happening accross the different queries in a + // transaction. Otherwise, if a traceparent is not propagated from the client, each query in + // the transaction will run within a span that has already been generated at the begining of the + // transaction, and held active in the actor in charge of running the queries. Thus, making + // impossible to capture traces happening in the individual queries, as they won't be aware of + // the transaction they are part of. + // + // By generating this "fake" traceparent based on the transaction id, we can have a common + // trace_id for all transaction operations. + fn from(id: TxId) -> Self { + let extractor: HashMap = + HashMap::from_iter(vec![("traceparent".to_string(), id.as_traceparent())]); + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) + } +} + +impl TxId { + pub fn as_traceparent(&self) -> String { + let trace_id = opentelemetry::trace::TraceId::from(self.clone()); + format!("00-{}-0000000000000001-01", trace_id) + } +} + +impl From for opentelemetry::trace::TraceId { + // in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte, + // (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id. + // this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa" + // + // - first letter is always the same + // - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes + // - next 4 bytes are a counter since the server started + // - next 4 bytes are a system fingerprint, invariant for the same server instance + // - least significative 8 bytes. Totally random. + // + // We want the most entropic slice of 16 bytes that's deterministicly determined + fn from(id: TxId) -> Self { + let mut buffer = [0; 16]; + let tx_id_bytes = id.0.as_bytes(); + let len = tx_id_bytes.len(); + + // bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter + for (i, source_idx) in (len - 20..len - 12).enumerate() { + buffer[i] = tx_id_bytes[source_idx]; + } + // bytes [len-8 to len): the random blocks + for (i, source_idx) in (len - 8..len).enumerate() { + buffer[i + 8] = tx_id_bytes[source_idx]; + } + + opentelemetry::trace::TraceId::from_bytes(buffer) + } +} + pub enum CachedTx { Open(OpenTx), Committed, @@ -152,3 +209,30 @@ pub enum ClosedTx { RolledBack, Expired { start_time: Instant, timeout: Duration }, } + +// tests for txid into traits +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_txid_into_traceid() { + let fixture = vec![ + ("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"), + // counter changed, trace id changed: + ("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // fingerprint changed, trace id did not change, as that chunk is ignored: + ("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // first 5 bytes changed, trace id did not change, as that chunk is ignored: + ("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp + ("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"), + ]; + + for (txid, expected_trace_id) in fixture { + let txid = TxId(txid.to_string()); + let trace_id: opentelemetry::trace::TraceId = txid.into(); + assert_eq!(trace_id.to_string(), expected_trace_id); + } + } +} diff --git a/query-engine/core/src/lib.rs b/query-engine/core/src/lib.rs index 3bf22ee2cfde..1645fa13e16f 100644 --- a/query-engine/core/src/lib.rs +++ b/query-engine/core/src/lib.rs @@ -16,7 +16,7 @@ pub mod telemetry; pub use self::{ error::{CoreError, FieldConversionError}, executor::{QueryExecutor, TransactionOptions}, - interactive_transactions::{TransactionError, TxId}, + interactive_transactions::{ExtendedTransactionUserFacingError, TransactionError, TxId}, query_document::*, telemetry::*, }; diff --git a/query-engine/core/src/telemetry/capturing/capturer.rs b/query-engine/core/src/telemetry/capturing/capturer.rs new file mode 100644 index 000000000000..01dba750b50a --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/capturer.rs @@ -0,0 +1,490 @@ +use super::{settings::Settings, storage::Storage}; +use crate::models; +use opentelemetry::{ + sdk::{ + export::trace::SpanData, + trace::{Span, SpanProcessor}, + }, + trace::{TraceId, TraceResult}, +}; +use std::{borrow::Cow, fmt}; +use std::{collections::HashMap, sync::Arc, sync::Mutex}; + +/// Capturer determines, based on a set of settings and a trace id, how capturing is going to be handled. +/// Generally, both the trace id and the settings will be derived from request headers. Thus, a new +/// value of this enum is created per request. +#[derive(Debug, Clone)] +pub enum Capturer { + Enabled(Inner), + Disabled, +} + +impl Capturer { + pub(super) fn new(processor: Processor, trace_id: TraceId, settings: Settings) -> Self { + if settings.is_enabled() { + return Self::Enabled(Inner { + processor, + trace_id, + settings, + }); + } + + Self::Disabled + } +} + +#[derive(Debug, Clone)] +pub struct Inner { + pub(super) processor: Processor, + pub(super) trace_id: TraceId, + pub(super) settings: Settings, +} + +impl Inner { + pub async fn start_capturing(&self) { + self.processor + .start_capturing(self.trace_id, self.settings.clone()) + .await + } + + pub async fn fetch_captures(&self) -> Option { + self.processor.fetch_captures(self.trace_id).await + } +} + +/// A [`SpanProcessor`] that captures and stores spans in memory in a synchronized dictionary for +/// later retrieval +#[derive(Debug, Clone)] +pub struct Processor { + pub(crate) storage: Arc>>, +} + +impl Processor { + pub fn new() -> Self { + Self { + storage: Default::default(), + } + } + + pub(self) async fn start_capturing(&self, trace_id: TraceId, settings: Settings) { + let mut locked_storage = self.storage.lock().unwrap(); + locked_storage.insert(trace_id, settings.clone().into()); + drop(locked_storage); + + let ttl = settings.ttl; + let storage = self.storage.clone(); + tokio::spawn(async move { + tokio::time::sleep(ttl).await; + let mut locked_traces = storage.lock().unwrap(); + if locked_traces.remove(&trace_id).is_some() { + warn!("Timeout waiting for telemetry to be captured. trace_id={}", trace_id) + } + }); + } + + pub(self) async fn fetch_captures(&self, trace_id: TraceId) -> Option { + let mut traces = self.storage.lock().unwrap(); + + traces.remove(&trace_id) + } +} + +impl Default for Processor { + fn default() -> Self { + Self::new() + } +} + +impl SpanProcessor for Processor { + fn on_start(&self, _: &mut Span, _: &opentelemetry::Context) { + // no-op + } + + /// Exports a spancontaining zero or more events that might represent + /// logs in the prisma client logging categories of logs (query, info, warn, error) + /// + /// There's an impedance between the client categories of logs and the server (standard) + /// hierarchical levels of logs (trace, debug, info, warn, error). + /// + /// The most prominent difference is the "query" type of events. In the client these model + /// database queries made by the engine through a connector. But ATM there's not a 1:1 mapping + /// between the client "query" level and one of the server levels. And depending on the database + /// mongo / relational, the information to build this kind of log event is logged diffeerently in + /// the server. + /// + /// In the case of the of relational databaes --queried through sql_query_connector and eventually + /// through quaint, a trace span describes the query-- `TraceSpan::represents_query_event` + /// determines if a span represents a query event. + /// + /// In the case of mongo, an event represents the query, but it needs to be transformed before + /// capturing it. `Event::query_event` does that. + fn on_end(&self, span_data: SpanData) { + let trace_id = span_data.span_context.trace_id(); + + let mut locked_storage = self.storage.lock().unwrap(); + if let Some(storage) = locked_storage.get_mut(&trace_id) { + let settings = storage.settings.clone(); + let original_span_name = span_data.name.clone(); + + let (events, span) = models::TraceSpan::from(span_data).split_events(); + + let candidate_span = Candidate { + value: span, + settings: &settings, + original_span_name: Some(original_span_name), + }; + + let capture: Capture = candidate_span.into(); + capture.add_to(&mut storage.traces, &mut storage.logs); + + if storage.settings.logs_enabled() { + events.into_iter().for_each(|log| { + let candidate_event = Candidate { + value: log, + settings: &settings, + original_span_name: None, + }; + let capture: Capture = candidate_event.into(); + capture.add_to(&mut storage.traces, &mut storage.logs); + }); + } + } + } + + fn force_flush(&self) -> TraceResult<()> { + // no-op + Ok(()) + } + + fn shutdown(&mut self) -> TraceResult<()> { + // no-op + Ok(()) + } +} + +/// A Candidate represents either a span or an event that is being considered for capturing. +/// A Candidate can be converted into a [`Capture`]. +#[derive(Debug, Clone)] +struct Candidate<'batch_iter, T: Clone + fmt::Debug> { + value: T, + settings: &'batch_iter Settings, + original_span_name: Option>, +} + +impl Candidate<'_, models::TraceSpan> { + #[inline(always)] + fn is_loggable_quaint_query(&self) -> bool { + self.settings.included_log_levels.contains("query") + && self.original_span_name.is_some() + && matches!(self.original_span_name, Some(Cow::Borrowed("quaint:query"))) + } + + fn query_event(&self) -> models::Event { + let span = &self.value; + + let duration_ms = ((span.end_time[0] as f64 - span.start_time[0] as f64) * 1_000.0) + + ((span.end_time[1] as f64 - span.start_time[1] as f64) / 1_000_000.0); + + let statement = if let Some(q) = span.attributes.get("db.statement") { + match q { + serde_json::Value::String(s) => s.to_string(), + _ => "unknown".to_string(), + } + } else { + "unknown".to_string() + }; + + let attributes = vec![( + "duration_ms".to_owned(), + serde_json::Value::Number(serde_json::Number::from_f64(duration_ms).unwrap()), + )] + .into_iter() + .collect(); + + models::Event { + span_id: Some(span.span_id.to_owned()), + name: statement, + level: "query".to_string(), + timestamp: span.start_time, + attributes, + } + } +} + +impl Candidate<'_, models::LogEvent> { + #[inline(always)] + fn is_loggable_mongo_db_query(&self) -> bool { + self.settings.included_log_levels.contains("query") && { + if let Some(target) = self.value.attributes.get("target") { + if let Some(val) = target.as_str() { + return val == "mongodb_query_connector::query"; + } + } + false + } + } + + #[inline(always)] + fn is_loggable_event(&self) -> bool { + self.settings.included_log_levels.contains(&self.value.level) + } + + fn query_event(self) -> models::LogEvent { + let mut attributes = self.value.attributes; + let mut attrs = HashMap::new(); + if let Some(dur) = attributes.get("duration_ms") { + attrs.insert("duration_ms".to_owned(), dur.clone()); + } + + let mut name = "uknown".to_owned(); + if let Some(query) = attributes.remove("query") { + if let Some(str) = query.as_str() { + name = str.to_owned(); + } + } + + models::LogEvent { + name, + level: "query".to_string(), + attributes: attrs, + ..self.value + } + } +} + +/// Capture provides mechanisms to transform a candidate into one of the enum variants. +/// This is necessary because a candidate span might also be transformed into a log event +/// (for quaint queries), or log events need to be transformed to a slightly different format +/// (for mongo queries). In addition some span and events are discarded. +enum Capture { + Span(models::TraceSpan), + LogEvent(models::LogEvent), + Multiple(Vec), + Discarded, +} + +impl Capture { + /// Add the capture to the traces and logs vectors. We pass the vectors in to allow for + /// a recursive implementation for the case of a candidate transforming into a Capture::Multiple + fn add_to(self, traces: &mut Vec, logs: &mut Vec) { + match self { + Capture::Span(span) => { + traces.push(span); + } + Capture::LogEvent(log) => { + logs.push(log); + } + Capture::Multiple(captures) => { + for capture in captures { + capture.add_to(traces, logs); + } + } + Capture::Discarded => {} + } + } +} + +/// A Candidate Event can be transformed into either a slightly different LogEvent (for mongo queries) +/// be captrured as-is if its log level is among the levels to capture, or be discarded. +impl From> for Capture { + fn from(candidate: Candidate<'_, models::Event>) -> Capture { + if candidate.is_loggable_mongo_db_query() { + // mongo events representing queries are transformed into a more meaningful log event + Capture::LogEvent(candidate.query_event()) + } else if candidate.is_loggable_event() { + Capture::LogEvent(candidate.value) + } else { + Capture::Discarded + } + } +} + +/// A Candidate TraceSpan can be transformed into a LogEvent (for quaint queries) if query logging +/// is enabled; captured as-is, if tracing is enabled; or be discarded. +impl From> for Capture { + fn from(candidate: Candidate<'_, models::TraceSpan>) -> Capture { + let mut captures = vec![]; + + if candidate.is_loggable_quaint_query() { + captures.push(Capture::LogEvent(candidate.query_event())); + } + + if candidate.settings.traces_enabled() { + captures.push(Capture::Span(candidate.value)); + } + + match captures.len() { + 0 => Capture::Discarded, + 1 => captures.pop().unwrap(), + _ => Capture::Multiple(captures), + } + } +} + +/// tests for capture exporter +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_candidate_event_transformation() { + let event = models::LogEvent { + span_id: Some("00f067aa0ba902b7".to_owned()), + name: "foo bar".to_owned(), + level: "debug".to_owned(), + timestamp: [101, 0], + attributes: vec![ + ( + "target".to_owned(), + serde_json::Value::String("mongodb_query_connector::query".to_owned()), + ), + ( + "query".to_owned(), + serde_json::Value::String("db.Users.find()".to_owned()), + ), + ("duration_ms".to_owned(), serde_json::json!(100.0)), + ] + .into_iter() + .collect(), + }; + + let only_query_log_events: Settings = "query".into(); + + let candidate = Candidate { + value: event.clone(), + settings: &only_query_log_events, + original_span_name: None, + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::LogEvent(event) => { + assert_eq!(event.level, "query"); + assert_eq!(event.name.to_string().as_str(), "db.Users.find()"); + assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "100.0"); + } + _ => unreachable!(), + }; + + let event = models::LogEvent { + attributes: vec![( + "target".to_owned(), + serde_json::Value::String("a different one".to_owned()), + )] + .into_iter() + .collect(), + ..event + }; + let candidate = Candidate { + value: event.clone(), + settings: &only_query_log_events, + original_span_name: None, + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::Discarded => {} + _ => unreachable!(), + } + } + + #[test] + fn test_candidate_span_transformation() { + let trace_span = models::TraceSpan { + trace_id: "4bf92f3577b34da6a3ce929d0e0e4736".to_owned(), + span_id: "00f067aa0ba902b7".to_owned(), + parent_span_id: "00f067aa0ba902b5".to_owned(), + name: "prisma:engine:db_query".to_ascii_lowercase(), + start_time: [101, 0], + end_time: [101, 10000000], + attributes: vec![( + "db.statement".to_owned(), + serde_json::Value::String("SELECT 1".to_owned()), + )] + .into_iter() + .collect(), + events: Default::default(), + links: Default::default(), + }; + + // capturing query events + let only_query_log_events: Settings = "query".into(); + let original_span_name = Some(Cow::Borrowed("quaint:query")); + + let candidate = Candidate { + value: trace_span.clone(), + settings: &only_query_log_events, + original_span_name: original_span_name.clone(), + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::LogEvent(event) => { + assert_eq!(event.level, "query"); + assert_eq!(event.name.to_string().as_str(), "SELECT 1"); + assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "10.0"); + } + _ => unreachable!(), + }; + + // capturing query and tracing events + let query_logs_and_traces: Settings = "query, tracing".into(); + let candidate = Candidate { + value: trace_span.clone(), + settings: &query_logs_and_traces, + original_span_name: original_span_name.clone(), + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::Multiple(captures) => { + match captures[0] { + Capture::LogEvent(_) => {} + _ => unreachable!(), + }; + + match captures[1] { + Capture::Span(_) => {} + _ => unreachable!(), + }; + } + _ => unreachable!(), + }; + + // capturing nothing + let reject_all: Settings = "".into(); + let candidate = Candidate { + value: trace_span.clone(), + settings: &reject_all, + original_span_name: original_span_name.clone(), + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::Discarded => {} + _ => unreachable!(), + }; + } + + #[tokio::test] + async fn test_garbage_collection() { + let exporter = Processor::new(); + + let trace_id = TraceId::from_hex("1").unwrap(); + let one_ms = Duration::from_millis(1); + + let mut settings = Settings::default(); + settings.ttl = one_ms; + + exporter.start_capturing(trace_id, settings).await; + let storage = exporter.storage.lock().unwrap(); + assert!(storage.get(&trace_id).is_some()); + drop(storage); + + tokio::time::sleep(10 * one_ms).await; + + let storage = exporter.storage.lock().unwrap(); + assert!(storage.get(&trace_id).is_none()); + } +} diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs new file mode 100644 index 000000000000..438e3eb5d3e5 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -0,0 +1,160 @@ +//! Telemetry Capturing is the process of recording the logs and traces happening during a request +//! to the binary engine, and rendering them in the response. +//! +//! The interaction diagram below (soorry width!) shows the different roles at play during telemetry +//! capturing. A textual explanatation follows it. For the sake of example a server environment +//! --the query-engine crate-- is assumed. +//! # ╔═══════════════════════╗ ╔═══════════════╗ +//! # ║<>║ ║ Storage ║ ╔═══════════════════╗ +//! # ┌───────────────────┐ ║ PROCESSOR ║ ║ ║ ║ TRACER ║ +//! # │ Server │ ╚═══════════╦═══════════╝ ╚═══════╦═══════╝ ╚═════════╦═════════╝ +//! # └─────────┬─────────┘ │ │ │ +//! # │ │ │ │ +//! # │ │ │ │ +//! # POST │ │ │ │ +//! # (body, headers)│ │ │ │ +//! # ──────────▶┌┴┐ │ │ │ +//! # ┌─┐ │ │new(headers)╔════════════╗ │ │ │ +//! # │1│ │ ├───────────▶║s: Settings ║ │ │ │ +//! # └─┘ │ │ ╚════════════╝ │ │ │ +//! # │ │ │ │ │ +//! # │ │ ╔═══════════════════╗ │ │ │ +//! # │ │ ║ Capturer::Enabled ║ │ │ │ ┌────────────┐ +//! # │ │ ╚═══════════════════╝ │ │ │ │<│ +//! # │ │ │ │ │ │ └──────┬─────┘ +//! # │ │ ┌─┐ new(trace_id, s) │ │ │ │ │ +//! # │ ├───┤2├───────────────────────▶│ │ │ │ │ +//! # │ │ └─┘ │ │ │ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ ┌─┐ start_capturing() │ start_capturing │ │ │ │ +//! # │ ├───┤3├───────────────────────▶│ (trace_id, s) │ │ │ │ +//! # │ │ └─┘ │ │ │ │ │ +//! # │ │ ├─────────────────────▶│ │ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ │ │ ┌─┐insert(trace_id, s) │ │ │ +//! # │ │ │ ├─┤4├────────────────────▶│ │ │ +//! # │ │ │ │ └─┘ │ │ │ +//! # │ │ │ │ │ ┌─┐ │ process_query │ +//! # │ ├──────────────────────────────┼──────────────────────┼─────────────────────────┼────────────┤5├──────┼──────────────────────────▶┌┴┐ +//! # │ │ │ │ │ └─┘ │ │ │ +//! # │ │ │ │ │ │ │ │ +//! # │ │ │ │ │ │ │ │ ┌─────────────────────┐ +//! # │ │ │ │ │ │ log! / span! ┌─┐ │ │ │ res: PrismaResponse │ +//! # │ │ │ │ │ │◀─────────────────────┤6├──│ │ └──────────┬──────────┘ +//! # │ │ │ │ on_end(span_data)│ ┌─┐ │ └─┘ │ │ new │ +//! # │ │ │ │◀────────────────────────┼────────────┤7├──────┤ │ │────────────▶│ +//! # │ │ │ │ │ └─┘ │ │ │ │ +//! # │ │ │ │ append(trace_id, logs, │ │ │ │ │ +//! # │ │ │ │ ┌─┐ traces) │ │ │ │ │ +//! # │ │ │ ├──┤8├───────────────────▶│ │ │ │ │ +//! # │ │ │ │ └─┘ │ │ │ │ │ +//! # │ │ res: PrismaResponse │ ┌─┐ │ │ │ │ │ │ +//! # │ │◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ┤9├ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─└┬┘ │ +//! # │ │ ┌────┐ fetch_captures() │ └─┘ │ │ │ │ │ +//! # │ ├─┤ 10 ├──────────────────────▶│ fetch_captures │ │ │ │ │ +//! # │ │ └────┘ │ (trace_id) │ │ │ │ │ +//! # │ │ ├─────────────────────▶│ │ │ x │ +//! # │ │ │ │ │ │ │ +//! # │ │ │ │┌────┐ get logs/traces │ │ │ +//! # │ │ │ ├┤ 11 ├───────────────────▶ │ │ +//! # │ │ │ │└────┘ │ │ │ +//! # │ │ │ │◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ ◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ │ +//! # │ │ logs, traces │ │ │ │ │ +//! # │ │◁─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ │ │ +//! # │ │ x ┌────┐ │ │ │ res.set_extension(logs) │ +//! # │ ├───────────────────────────────────────┤ 12 ├────────┼─────────────────────────┼─────────────────────┼──────────────────────────────────────────▶│ +//! # │ │ └────┘ │ │ │ res.set_extension(traces) │ +//! # │ ├─────────────────────────────────────────────────────┼─────────────────────────┼─────────────────────┼──────────────────────────────────────────▶│ +//! # ◀ ─ ─ ─└┬┘ │ │ │ x +//! # json!(res) │ +//! # ┌────┐ │ +//! # │ 13 │ │ +//! # └────┘ +//! # +//! # ◁─ ─ ─ ─ return +//! # +//! # ◁─────── call (pseudo-signatures) +//! # +//! +//! In the diagram, you will see objects whose lifetime is static. The boxes for those have a double +//! width margin. These are: +//! +//! - The `server` itself +//! - The global `TRACER`, which handles `log!` and `span!` and uses the global `PROCESSOR` to +//! process the data constituting a trace `Span`s and log `Event`s +//! - The global `PROCESSOR`, which manages the `Storage` set of data structures, holding logs, +//! traces (and capture settings) per request. +//! +//! Then, through the request lifecycle, different objects are created and dropped: +//! +//! - When a request comes in, its headers are processed and a [`Settings`] object is built, this +//! object determines, for the request, how logging and tracing are going to be captured: if only +//! traces, logs, or both, and which log levels are going to be captured. +//! - Based on the settings, a new `Capturer` is created; a capturer is nothing but an exporter +//! wrapped to start capturing / fetch the captures for this particular request. +//! +//! Then the capturing process works in this way: +//! +//! - The server receives a query **[1]** +//! - It grabs the HTTP headers and builds a `Capture` object **[2]**, which is configured with the settings +//! denoted by the `X-capture-telemetry` +//! - Now the server tells the `Capturer` to start capturing all the logs and traces occurring on +//! the request **[3]** (denoted by a `trace_id`) The `trace_id` is either carried on the `traceparent` +//! header or implicitly created on the first span of the request. To _start capturing_ implies +//! creating for the `trace_id` in two different data structures: `logs` and `traces`; and storing +//! the settings for selecting the Spans and Event to capture **[4]**. +//! - The server dispatches the request and _Somewhere_ else in the code, it is processed **[5]**. +//! - There the code logs events and emits traces asynchronously, as part of the processing **[6]** +//! - Traces and Logs arrive at the `TRACER`, and get hydrated in the `PROCESSOR` **[7]**, +//! which writes them in the shard corresponding to the current `trace_id`, into the +//! `logs` and `traces` storage **[8]**. The settings previously stored under the `trace_id` +//! key, are used to pick which events and spans are going to be captured based on their level. +//! - When the code that dispatches the request is done it returns a `PrismaResponse` to the +//! server **[9]**. +//! - Then the server asks the `PROCESSOR` to fetch the captures **[10]** +//! - And right after that, it fetches the captures from the `Storage` **[11]**. At that time, although +//! that's not represented in the diagram, the captures are deleted from the storage, thus +//! freeing any memory used for capturing during the request +//! - Finally, the server sets the `logs` and `traces` extensions in the `PrismaResponse`**[12]**, +//! it serializes the extended response in json format and returns it as an HTTP Response +//! blob **[13]**. +//! +pub use self::capturer::Capturer; +pub use self::settings::Settings; + +use self::capturer::Processor; +use once_cell::sync::Lazy; +use opentelemetry::{global, sdk, trace}; + +static PROCESSOR: Lazy = Lazy::new(Processor::default); +static TRACER: Lazy = Lazy::new(setup_and_install_tracer_globally); + +/// Creates a new capturer, which is configured to export traces and log events happening during a +/// particular request +pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer { + Capturer::new(PROCESSOR.to_owned(), trace_id, settings) +} + +/// Returns a clone to the global tracer used when capturing telemetry in the response +pub fn tracer() -> &'static sdk::trace::Tracer { + &TRACER +} + +/// Installs an opentelemetry tracer globally, which is configured to proecss +/// spans and export them to global exporter. +fn setup_and_install_tracer_globally() -> sdk::trace::Tracer { + global::set_text_map_propagator(sdk::propagation::TraceContextPropagator::new()); + + let provider_builder = sdk::trace::TracerProvider::builder().with_span_processor(PROCESSOR.to_owned()); + let provider = provider_builder.build(); + let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "opentelemetry"); + + global::set_tracer_provider(provider); + tracer +} + +mod capturer; +mod settings; +pub mod storage; diff --git a/query-engine/core/src/telemetry/capturing/settings.rs b/query-engine/core/src/telemetry/capturing/settings.rs new file mode 100644 index 000000000000..a039212b96d1 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/settings.rs @@ -0,0 +1,88 @@ +use std::{collections::HashSet, time}; + +static VALID_VALUES: &[&str] = &["error", "warn", "info", "debug", "trace", "query", "tracing"]; +static DEFAULT_TTL: time::Duration = time::Duration::from_secs(1800); // 30 minutes + +#[derive(Debug, Clone, Default)] +pub struct Settings { + // only capture log events from the specified log levels, the special level "query", which does not + // exist in the engine logging infrastructure, is shimed from any event describing a query, regardless + // of its level. + pub(super) included_log_levels: HashSet, + // whether to include trace spans when capturing + pub(super) include_traces: bool, + // how long to keep captured traces in memory, if by any chance the capturing breaks, a tokio task + // will clean captured traces after this duration. Defaults to [`DEFAULT_TTL`]. + pub(super) ttl: time::Duration, +} + +impl Settings { + pub(super) fn new(included_log_levels: HashSet, include_traces: bool, ttl: time::Duration) -> Self { + Self { + include_traces, + included_log_levels, + ttl, + } + } +} + +// As the test below shows, settings can be constructed froma comma separated string. +// Examples: valid: `"error, warn, query, tracing"` invalid: "foo, bar baz". strings corresponding +// passed in are trimmed and converted to lowercase. chunks corresponding to levels different from +// those in VALID_LEVELS are ignored. +// +// The ttl is always the same (DEFAULT_TTL) but is there to allow for easier unit-testing of c +// apturing logic +impl From<&str> for Settings { + fn from(s: &str) -> Self { + let chunks = s.split(','); + let mut set = HashSet::from_iter( + chunks + .into_iter() + .map(str::trim) + .map(str::to_lowercase) + .filter(|s| VALID_VALUES.contains(&s.as_str())), + ); + + let include_traces = set.remove("tracing"); + let included_log_levels = set; + + Self::new(included_log_levels, include_traces, DEFAULT_TTL) + } +} + +impl Settings { + #[inline(always)] + pub fn is_enabled(&self) -> bool { + self.traces_enabled() || self.logs_enabled() + } + + #[inline(always)] + pub fn traces_enabled(&self) -> bool { + self.include_traces + } + + #[inline(always)] + pub fn logs_enabled(&self) -> bool { + !self.included_log_levels.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_options_from() { + let options = Settings::from("error, warn, query, tracing"); + assert_eq!(options.included_log_levels.len(), 3); + assert!(options.included_log_levels.contains("error")); + assert!(options.included_log_levels.contains("warn")); + assert!(options.included_log_levels.contains("query")); + assert!(options.include_traces); + assert!(options.is_enabled()); + + let options = Settings::from("foo, bar baz"); + assert!(!options.is_enabled()); + } +} diff --git a/query-engine/core/src/telemetry/capturing/storage.rs b/query-engine/core/src/telemetry/capturing/storage.rs new file mode 100644 index 000000000000..9c2767169112 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/storage.rs @@ -0,0 +1,19 @@ +use super::settings::Settings; +use crate::telemetry::models; + +#[derive(Debug, Default)] +pub struct Storage { + pub traces: Vec, + pub logs: Vec, + pub settings: Settings, +} + +impl From for Storage { + fn from(settings: Settings) -> Self { + Self { + traces: Default::default(), + logs: Default::default(), + settings, + } + } +} diff --git a/query-engine/core/src/telemetry/helpers.rs b/query-engine/core/src/telemetry/helpers.rs index b62d0b72235f..b65484234e98 100644 --- a/query-engine/core/src/telemetry/helpers.rs +++ b/query-engine/core/src/telemetry/helpers.rs @@ -41,6 +41,15 @@ pub fn set_span_link_from_traceparent(span: &Span, traceparent: Option) } } +pub fn get_trace_id_from_traceparent(traceparent: Option<&str>) -> TraceId { + traceparent + .unwrap_or("0-0-0-0") + .split('-') + .nth(1) + .map(|id| TraceId::from_hex(id).unwrap_or(TraceId::INVALID)) + .unwrap() +} + pub fn get_trace_id_from_context(context: &Context) -> TraceId { let context_span = context.span(); context_span.span_context().trace_id() diff --git a/query-engine/core/src/telemetry/mod.rs b/query-engine/core/src/telemetry/mod.rs index 29dd6738caf2..56aab06e731f 100644 --- a/query-engine/core/src/telemetry/mod.rs +++ b/query-engine/core/src/telemetry/mod.rs @@ -1,2 +1,3 @@ +pub mod capturing; pub mod helpers; pub mod models; diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 728b7c27a1a6..98176683a501 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -27,6 +27,9 @@ pub struct Logger { enum TracingConfig { // exposed means tracing will be exposed through an HTTP endpoint in a jaeger-compatible format Http(String), + // captured means that traces will be captured in memory and exposed in the graphql response + // logs will be also exposed in the response when capturing is enabled + Captured, // stdout means that traces will be printed to standard output Stdout, // disabled means that tracing will be disabled @@ -59,16 +62,17 @@ impl Logger { self.metrics = Some(metrics); } - pub fn setup_telemetry(&mut self, enable_telemetry: bool, endpoint: &str) { + pub fn setup_telemetry(&mut self, enable_telemetry: bool, enable_capturing: bool, endpoint: &str) { let endpoint = if endpoint.is_empty() { None } else { Some(endpoint.to_owned()) }; - self.tracing_config = match (enable_telemetry, endpoint) { - (true, Some(endpoint)) => TracingConfig::Http(endpoint), - (true, None) => TracingConfig::Stdout, + self.tracing_config = match (enable_telemetry, enable_capturing, endpoint) { + (_, true, _) => TracingConfig::Captured, + (true, _, Some(endpoint)) => TracingConfig::Http(endpoint), + (true, _, None) => TracingConfig::Stdout, _ => TracingConfig::Disabled, }; } @@ -83,6 +87,7 @@ impl Logger { pub fn install(&self) -> LoggerResult<()> { let filter = telemetry::helpers::env_filter(self.log_queries, telemetry::helpers::QueryEngineLogLevel::FromEnv); let is_user_trace = filter_fn(telemetry::helpers::user_facing_span_only_filter); + let is_user_trace_or_event = filter_fn(telemetry::helpers::user_facing_filter); let fmt_layer = match self.log_format { LogFormat::Text => { @@ -100,6 +105,19 @@ impl Logger { .with(self.metrics.clone()); match self.tracing_config { + TracingConfig::Captured => { + // Capturing is enabled, it overrides otel exporting. + let tracer = telemetry::capturing::tracer().to_owned(); + let telemetry_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(is_user_trace_or_event) + .with_filter(telemetry::helpers::env_filter( + self.log_queries, + telemetry::helpers::QueryEngineLogLevel::FromEnv, + )); + let subscriber = subscriber.with(telemetry_layer); + subscriber::set_global_default(subscriber)?; + } TracingConfig::Http(ref endpoint) => { // Opentelemetry is enabled, but capturing is disabled, there's an endpoint to export // the traces to. diff --git a/query-engine/query-engine/src/opt.rs b/query-engine/query-engine/src/opt.rs index 2e5ef81e7c61..09d782ff807e 100644 --- a/query-engine/query-engine/src/opt.rs +++ b/query-engine/query-engine/src/opt.rs @@ -103,6 +103,10 @@ pub struct PrismaOpt { #[structopt(long)] pub enable_open_telemetry: bool, + #[structopt(long)] + /// Enable tracer to capture logs and traces and return in the response + pub enable_telemetry_in_response: bool, + /// The url to the OpenTelemetry collector. /// Enabling this will send the OpenTelemtry tracing to a collector /// and not via our custom stdout tracer diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index 1f688e8c0d35..b4322f1d8800 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -2,9 +2,12 @@ use crate::state::State; use crate::{opt::PrismaOpt, PrismaResult}; use hyper::service::{make_service_fn, service_fn}; use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode}; -use opentelemetry::global; -use opentelemetry::propagation::Extractor; -use query_core::{schema::QuerySchemaRenderer, TransactionOptions, TxId}; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::{global, propagation::Extractor}; +use query_core::helpers::get_trace_id_from_traceparent; +use query_core::{ + schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId, +}; use query_engine_metrics::MetricFormat; use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler}; use serde_json::json; @@ -113,7 +116,8 @@ async fn graphql_handler(state: State, req: Request) -> Result) -> Result) -> Result { let handler = GraphQlHandler::new(&*state.cx.executor, state.cx.query_schema()); - let result = handler.handle(body, tx_id, traceparent).instrument(span).await; + let mut result = handler.handle(body, tx_id, traceparent).instrument(span).await; + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + let telemetry = capturer.fetch_captures().await; + if let Some(telemetry) = telemetry { + result.set_extension("traces".to_owned(), json!(telemetry.traces)); + result.set_extension("logs".to_owned(), json!(telemetry.logs)); + } + } let result_bytes = serde_json::to_vec(&result).unwrap(); @@ -231,11 +277,11 @@ async fn transaction_handler(state: State, req: Request) -> Result) -> Result) -> Result, hyper::Error> { - let cx = get_parent_span_context(req.headers()); + let headers = req.headers().to_owned(); + let body_start = req.into_body(); - // block and buffer request until the request has completed let full_body = hyper::body::to_bytes(body_start).await?; - let tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap(); + let mut tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap(); + let tx_id = tx_opts.with_new_transaction_id(); + // This is the span we use to instrument the execution of a transaction. This span will be open + // during the tx execution, and held in the ITXServer for that transaction (see ITXServer]) let span = info_span!("prisma:engine:itx_runner", user_facing = true, itx_id = field::Empty); - span.set_parent(cx); + + // If telemetry needs to be captured, we use the span trace_id to correlate the logs happening + // during the different operations within a transaction. The trace_id is propagated in the + // traceparent header, but if it's not present, we need to synthetically create one for the + // transaction. This is needed, in case the client is interested in capturing logs and not + // traces, because: + // - The client won't send a traceparent header + // - A transaction initial span is created here (prisma:engine:itx_runner) and stored in the + // ITXServer for that transaction + // - When a query comes in, the graphql handler process it, but we need to tell the capturer + // to start capturing logs, and for that we need a trace_id. There are two places were we + // could get that information from: + // - First, it's the traceparent, but the client didn't send it, because they are only + // interested in logs. + // - Second, it's the root span for the transaction, but it's not in scope but instead + // stored in the ITXServer, in a different tokio task. + // + // For the above reasons, we need to create a trace_id that we can predict and use accross the + // different operations happening within a transaction. So we do it by converting the tx_id + // into a trace_id, leaning on the fact that the tx_id has more entropy, and there's no + // information loss. + let capture_settings = capture_settings(&headers); + let traceparent = traceparent(&headers); + if traceparent.is_none() && capture_settings.logs_enabled() { + span.set_parent(tx_id.into()) + } else { + span.set_parent(get_parent_span_context(&headers)) + } + let trace_id = span.context().span().span_context().trace_id(); + let capture_config = telemetry::capturing::capturer(trace_id, capture_settings); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } let result = state .cx @@ -262,9 +344,19 @@ async fn transaction_start_handler(state: State, req: Request) -> Result { - let result = json!({ "id": tx_id.to_string() }); + let result = if let Some(telemetry) = telemetry { + json!({ "id": tx_id.to_string(), "extensions": { "logs": json!(telemetry.logs), "traces": json!(telemetry.traces) } }) + } else { + json!({ "id": tx_id.to_string() }) + }; let result_bytes = serde_json::to_vec(&result).unwrap(); let res = Response::builder() @@ -274,23 +366,57 @@ async fn transaction_start_handler(state: State, req: Request) -> Result Ok(err_to_http_resp(err)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } -async fn transaction_commit_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { +async fn transaction_commit_handler( + state: State, + req: Request, + tx_id: TxId, +) -> Result, hyper::Error> { + let capture_config = capture_config(req.headers(), tx_id.clone()); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } + let result = state.cx.executor.commit_tx(tx_id).await; + + let telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.fetch_captures().await + } else { + None + }; + match result { - Ok(_) => Ok(empty_json_to_http_resp()), - Err(err) => Ok(err_to_http_resp(err)), + Ok(_) => Ok(empty_json_to_http_resp(telemetry)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } -async fn transaction_rollback_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { +async fn transaction_rollback_handler( + state: State, + req: Request, + tx_id: TxId, +) -> Result, hyper::Error> { + let capture_config = capture_config(req.headers(), tx_id.clone()); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } + let result = state.cx.executor.rollback_tx(tx_id).await; + + let telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.fetch_captures().await + } else { + None + }; + match result { - Ok(_) => Ok(empty_json_to_http_resp()), - Err(err) => Ok(err_to_http_resp(err)), + Ok(_) => Ok(empty_json_to_http_resp(telemetry)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } @@ -329,8 +455,12 @@ impl<'a> Extractor for HeaderExtractor<'a> { } } -fn empty_json_to_http_resp() -> Response { - let result = json!({}); +fn empty_json_to_http_resp(captured_telemetry: Option) -> Response { + let result = if let Some(telemetry) = captured_telemetry { + json!({ "extensions": { "logs": json!(telemetry.logs), "traces": json!(telemetry.traces) } }) + } else { + json!({}) + }; let result_bytes = serde_json::to_vec(&result).unwrap(); Response::builder() @@ -340,7 +470,10 @@ fn empty_json_to_http_resp() -> Response { .unwrap() } -fn err_to_http_resp(err: query_core::CoreError) -> Response { +fn err_to_http_resp( + err: query_core::CoreError, + captured_telemetry: Option, +) -> Response { let status = match err { query_core::CoreError::TransactionError(ref err) => match err { query_core::TransactionError::AcquisitionTimeout => 504, @@ -354,11 +487,40 @@ fn err_to_http_resp(err: query_core::CoreError) -> Response { _ => 500, }; - let user_error: user_facing_errors::Error = err.into(); - let body = Body::from(serde_json::to_vec(&user_error).unwrap()); + let mut err: ExtendedTransactionUserFacingError = err.into(); + if let Some(telemetry) = captured_telemetry { + err.set_extension("traces".to_owned(), json!(telemetry.traces)); + err.set_extension("logs".to_owned(), json!(telemetry.logs)); + } + let body = Body::from(serde_json::to_vec(&err).unwrap()); Response::builder().status(status).body(body).unwrap() } +fn capture_config(headers: &HeaderMap, tx_id: TxId) -> telemetry::capturing::Capturer { + let capture_settings = capture_settings(headers); + let mut traceparent = traceparent(headers); + + if traceparent.is_none() && capture_settings.is_enabled() { + traceparent = Some(tx_id.as_traceparent()) + } + + let trace_id = get_trace_id_from_traceparent(traceparent.as_deref()); + + telemetry::capturing::capturer(trace_id, capture_settings) +} + +#[allow(clippy::bind_instead_of_map)] +fn capture_settings(headers: &HeaderMap) -> telemetry::capturing::Settings { + const CAPTURE_TELEMETRY_HEADER: &str = "X-capture-telemetry"; + let s = if let Some(hv) = headers.get(CAPTURE_TELEMETRY_HEADER) { + hv.to_str().unwrap_or("") + } else { + "" + }; + + telemetry::capturing::Settings::from(s) +} + fn traceparent(headers: &HeaderMap) -> Option { const TRACEPARENT_HEADER: &str = "traceparent"; diff --git a/query-engine/query-engine/src/state.rs b/query-engine/query-engine/src/state.rs index e380e3f5d05c..892edbd9f548 100644 --- a/query-engine/query-engine/src/state.rs +++ b/query-engine/query-engine/src/state.rs @@ -41,7 +41,11 @@ pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option PrismaResult<()> { subcommand: Some(Subcommand::Cli(CliOpt::Dmmf)), enable_open_telemetry: false, open_telemetry_endpoint: String::new(), + enable_telemetry_in_response: false, dataproxy_metric_override: false, }; diff --git a/query-engine/request-handlers/src/graphql/response.rs b/query-engine/request-handlers/src/graphql/response.rs index 1847e4bc0656..8a6c75622c10 100644 --- a/query-engine/request-handlers/src/graphql/response.rs +++ b/query-engine/request-handlers/src/graphql/response.rs @@ -12,6 +12,9 @@ pub struct GQLResponse { #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, + + #[serde(skip_serializing_if = "IndexMap::is_empty")] + extensions: Map, } #[derive(Debug, serde::Serialize, Default, PartialEq)] @@ -22,6 +25,9 @@ pub struct GQLBatchResponse { #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, + + #[serde(skip_serializing_if = "IndexMap::is_empty")] + extensions: Map, } #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq)] @@ -67,6 +73,10 @@ impl GQLResponse { pub fn errors(&self) -> impl Iterator { self.errors.iter() } + + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } } impl From for GQLResponse { @@ -153,6 +163,10 @@ impl GQLBatchResponse { .iter() .chain(self.batch_result.iter().flat_map(|res| res.errors())) } + + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } } impl From for GQLBatchResponse { diff --git a/query-engine/request-handlers/src/lib.rs b/query-engine/request-handlers/src/lib.rs index 7d370e6f613b..e19be13f0eb8 100644 --- a/query-engine/request-handlers/src/lib.rs +++ b/query-engine/request-handlers/src/lib.rs @@ -16,3 +16,12 @@ pub enum PrismaResponse { Single(GQLResponse), Multi(GQLBatchResponse), } + +impl PrismaResponse { + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + match self { + Self::Single(r) => r.set_extension(key, val), + Self::Multi(r) => r.set_extension(key, val), + } + } +} From babfc04dce832bf77cc9a1e4eb198d8e82007383 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Tue, 24 Jan 2023 13:29:56 +0100 Subject: [PATCH 02/12] Set proper traceparent in the case of root spans --- query-engine/core/src/telemetry/helpers.rs | 23 +++++++-- query-engine/query-engine/src/server/mod.rs | 56 ++++++++++++--------- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/query-engine/core/src/telemetry/helpers.rs b/query-engine/core/src/telemetry/helpers.rs index b65484234e98..418a1d907a75 100644 --- a/query-engine/core/src/telemetry/helpers.rs +++ b/query-engine/core/src/telemetry/helpers.rs @@ -41,6 +41,24 @@ pub fn set_span_link_from_traceparent(span: &Span, traceparent: Option) } } +pub fn get_trace_parent_from_span(span: &Span) -> String { + let cx = span.context(); + let binding = cx.span(); + let span_context = binding.span_context(); + + format!("00-{}-{}-01", span_context.trace_id(), span_context.span_id()) +} + +pub fn get_trace_id_from_span(span: &Span) -> TraceId { + let cx = span.context(); + get_trace_id_from_context(&cx) +} + +pub fn get_trace_id_from_context(context: &Context) -> TraceId { + let context_span = context.span(); + context_span.span_context().trace_id() +} + pub fn get_trace_id_from_traceparent(traceparent: Option<&str>) -> TraceId { traceparent .unwrap_or("0-0-0-0") @@ -50,11 +68,6 @@ pub fn get_trace_id_from_traceparent(traceparent: Option<&str>) -> TraceId { .unwrap() } -pub fn get_trace_id_from_context(context: &Context) -> TraceId { - let context_span = context.span(); - context_span.span_context().trace_id() -} - pub enum QueryEngineLogLevel { FromEnv, Override(String), diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index b4322f1d8800..cd4554c43f42 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -4,7 +4,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode}; use opentelemetry::trace::TraceContextExt; use opentelemetry::{global, propagation::Extractor}; -use query_core::helpers::get_trace_id_from_traceparent; +use query_core::helpers::*; use query_core::{ schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId, }; @@ -132,30 +132,36 @@ async fn graphql_handler(state: State, req: Request) -> Result Date: Wed, 25 Jan 2023 13:42:06 +0100 Subject: [PATCH 03/12] Refactoring, move layer creation to telemetry::capturing --- .../core/src/telemetry/capturing/helpers.rs | 12 +++++ .../core/src/telemetry/capturing/mod.rs | 49 +++++++++++++------ query-engine/core/src/telemetry/helpers.rs | 6 +-- query-engine/query-engine/src/logger.rs | 14 +----- 4 files changed, 51 insertions(+), 30 deletions(-) create mode 100644 query-engine/core/src/telemetry/capturing/helpers.rs diff --git a/query-engine/core/src/telemetry/capturing/helpers.rs b/query-engine/core/src/telemetry/capturing/helpers.rs new file mode 100644 index 000000000000..ac5fce77f652 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/helpers.rs @@ -0,0 +1,12 @@ +use tracing::Metadata; + +/// Filters-in spans and events that are statically determined to be relevant for capturing +/// Dynamic filtering will be done by the [`crate::capturer::Processor`] +pub fn span_and_event_filter(meta: &Metadata) -> bool { + if meta.fields().iter().any(|f| f.name() == "user_facing") { + return true; + } + + // relevant quaint connector spans and events + meta.target() == "quaint::connector::metrics" +} diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs index 438e3eb5d3e5..4dd5a5fdc1f2 100644 --- a/query-engine/core/src/telemetry/capturing/mod.rs +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -127,9 +127,13 @@ pub use self::settings::Settings; use self::capturer::Processor; use once_cell::sync::Lazy; use opentelemetry::{global, sdk, trace}; +use query_engine_metrics::MetricRegistry; +use tracing::subscriber; +use tracing_subscriber::{ + filter::filter_fn, layer::Layered, prelude::__tracing_subscriber_SubscriberExt, Layer, Registry, +}; static PROCESSOR: Lazy = Lazy::new(Processor::default); -static TRACER: Lazy = Lazy::new(setup_and_install_tracer_globally); /// Creates a new capturer, which is configured to export traces and log events happening during a /// particular request @@ -137,24 +141,41 @@ pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer { Capturer::new(PROCESSOR.to_owned(), trace_id, settings) } -/// Returns a clone to the global tracer used when capturing telemetry in the response -pub fn tracer() -> &'static sdk::trace::Tracer { - &TRACER -} - -/// Installs an opentelemetry tracer globally, which is configured to proecss -/// spans and export them to global exporter. -fn setup_and_install_tracer_globally() -> sdk::trace::Tracer { +/// Adds a capturing layer to the given subscriber and installs the transformed subscriber as the +/// global, default subscriber +pub fn install_capturing_layer( + subscriber: Layered, Layered + Send + Sync>, Registry>>, + log_queries: bool, +) { + // set a trace context propagator, so that the trace context is propagated via the + // `traceparent` header from other systems global::set_text_map_propagator(sdk::propagation::TraceContextPropagator::new()); - - let provider_builder = sdk::trace::TracerProvider::builder().with_span_processor(PROCESSOR.to_owned()); - let provider = provider_builder.build(); + // create a tracer provider that is configured to use our custom processor to process spans + let provider = sdk::trace::TracerProvider::builder() + .with_span_processor(PROCESSOR.to_owned()) + .build(); + // create a tracer out of the provider let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "opentelemetry"); - + // set the provider as the global provider global::set_tracer_provider(provider); - tracer + + // create a layer that will filter initial events and spans based on the log level configuration + // from the environment and a specific filter to discard things that we are not interested in + // from a capturiong perspective + let telemetry_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(crate::helpers::env_filter( + log_queries, + crate::helpers::QueryEngineLogLevel::FromEnv, + )) + .with_filter(filter_fn(helpers::span_and_event_filter)); + // decorate the given subscriber (more layers were added before this one) with the telemetry layer + let subscriber = subscriber.with(telemetry_layer); + // and finally set the subscriber as the global, default subscriber + subscriber::set_global_default(subscriber).unwrap(); } mod capturer; +mod helpers; mod settings; pub mod storage; diff --git a/query-engine/core/src/telemetry/helpers.rs b/query-engine/core/src/telemetry/helpers.rs index 418a1d907a75..b7ad24811f7c 100644 --- a/query-engine/core/src/telemetry/helpers.rs +++ b/query-engine/core/src/telemetry/helpers.rs @@ -114,10 +114,6 @@ pub fn user_facing_span_only_filter(meta: &Metadata) -> bool { return false; } - user_facing_filter(meta) -} - -pub fn user_facing_filter(meta: &Metadata) -> bool { if *SHOW_ALL_TRACES { return true; } @@ -126,5 +122,7 @@ pub fn user_facing_filter(meta: &Metadata) -> bool { return true; } + // spans describing a quaint query. + // TODO: should this span be made user_facing in quaint? meta.target() == "quaint::connector::metrics" && meta.name() == "quaint:query" } diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 98176683a501..32d91b725f8b 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -87,7 +87,6 @@ impl Logger { pub fn install(&self) -> LoggerResult<()> { let filter = telemetry::helpers::env_filter(self.log_queries, telemetry::helpers::QueryEngineLogLevel::FromEnv); let is_user_trace = filter_fn(telemetry::helpers::user_facing_span_only_filter); - let is_user_trace_or_event = filter_fn(telemetry::helpers::user_facing_filter); let fmt_layer = match self.log_format { LogFormat::Text => { @@ -106,17 +105,8 @@ impl Logger { match self.tracing_config { TracingConfig::Captured => { - // Capturing is enabled, it overrides otel exporting. - let tracer = telemetry::capturing::tracer().to_owned(); - let telemetry_layer = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(is_user_trace_or_event) - .with_filter(telemetry::helpers::env_filter( - self.log_queries, - telemetry::helpers::QueryEngineLogLevel::FromEnv, - )); - let subscriber = subscriber.with(telemetry_layer); - subscriber::set_global_default(subscriber)?; + let log_queries = self.log_queries; + telemetry::capturing::install_capturing_layer(subscriber, log_queries) } TracingConfig::Http(ref endpoint) => { // Opentelemetry is enabled, but capturing is disabled, there's an endpoint to export From 45681bb3d2f60a0bb4bd3de6fe71152e17d54f13 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Wed, 25 Jan 2023 15:46:14 +0100 Subject: [PATCH 04/12] Simplify capturer design --- .../core/src/telemetry/capturing/capturer.rs | 251 ++---------------- .../core/src/telemetry/capturing/mod.rs | 1 - query-engine/core/src/telemetry/models.rs | 1 + 3 files changed, 21 insertions(+), 232 deletions(-) diff --git a/query-engine/core/src/telemetry/capturing/capturer.rs b/query-engine/core/src/telemetry/capturing/capturer.rs index 01dba750b50a..52b4cbebb59a 100644 --- a/query-engine/core/src/telemetry/capturing/capturer.rs +++ b/query-engine/core/src/telemetry/capturing/capturer.rs @@ -7,7 +7,7 @@ use opentelemetry::{ }, trace::{TraceId, TraceResult}, }; -use std::{borrow::Cow, fmt}; +use std::fmt; use std::{collections::HashMap, sync::Arc, sync::Mutex}; /// Capturer determines, based on a set of settings and a trace id, how capturing is going to be handled. @@ -124,14 +124,12 @@ impl SpanProcessor for Processor { let mut locked_storage = self.storage.lock().unwrap(); if let Some(storage) = locked_storage.get_mut(&trace_id) { let settings = storage.settings.clone(); - let original_span_name = span_data.name.clone(); let (events, span) = models::TraceSpan::from(span_data).split_events(); let candidate_span = Candidate { value: span, settings: &settings, - original_span_name: Some(original_span_name), }; let capture: Capture = candidate_span.into(); @@ -142,7 +140,6 @@ impl SpanProcessor for Processor { let candidate_event = Candidate { value: log, settings: &settings, - original_span_name: None, }; let capture: Capture = candidate_event.into(); capture.add_to(&mut storage.traces, &mut storage.logs); @@ -161,95 +158,44 @@ impl SpanProcessor for Processor { Ok(()) } } - +const VALID_QUERY_ATTRS: [&str; 3] = ["query", "params", "duration_ms"]; /// A Candidate represents either a span or an event that is being considered for capturing. /// A Candidate can be converted into a [`Capture`]. #[derive(Debug, Clone)] struct Candidate<'batch_iter, T: Clone + fmt::Debug> { value: T, settings: &'batch_iter Settings, - original_span_name: Option>, -} - -impl Candidate<'_, models::TraceSpan> { - #[inline(always)] - fn is_loggable_quaint_query(&self) -> bool { - self.settings.included_log_levels.contains("query") - && self.original_span_name.is_some() - && matches!(self.original_span_name, Some(Cow::Borrowed("quaint:query"))) - } - - fn query_event(&self) -> models::Event { - let span = &self.value; - - let duration_ms = ((span.end_time[0] as f64 - span.start_time[0] as f64) * 1_000.0) - + ((span.end_time[1] as f64 - span.start_time[1] as f64) / 1_000_000.0); - - let statement = if let Some(q) = span.attributes.get("db.statement") { - match q { - serde_json::Value::String(s) => s.to_string(), - _ => "unknown".to_string(), - } - } else { - "unknown".to_string() - }; - - let attributes = vec![( - "duration_ms".to_owned(), - serde_json::Value::Number(serde_json::Number::from_f64(duration_ms).unwrap()), - )] - .into_iter() - .collect(); - - models::Event { - span_id: Some(span.span_id.to_owned()), - name: statement, - level: "query".to_string(), - timestamp: span.start_time, - attributes, - } - } } impl Candidate<'_, models::LogEvent> { #[inline(always)] - fn is_loggable_mongo_db_query(&self) -> bool { - self.settings.included_log_levels.contains("query") && { + fn is_loggable_query_event(&self) -> bool { + if self.settings.included_log_levels.contains("query") { if let Some(target) = self.value.attributes.get("target") { if let Some(val) = target.as_str() { - return val == "mongodb_query_connector::query"; + return (val == "quaint::connector::metrics" && self.value.attributes.get("query").is_some()) + || val == "mongodb_query_connector::query"; } } - false } + false } - #[inline(always)] - fn is_loggable_event(&self) -> bool { - self.settings.included_log_levels.contains(&self.value.level) - } - - fn query_event(self) -> models::LogEvent { - let mut attributes = self.value.attributes; - let mut attrs = HashMap::new(); - if let Some(dur) = attributes.get("duration_ms") { - attrs.insert("duration_ms".to_owned(), dur.clone()); - } - - let mut name = "uknown".to_owned(); - if let Some(query) = attributes.remove("query") { - if let Some(str) = query.as_str() { - name = str.to_owned(); - } - } + fn query_event(mut self) -> models::LogEvent { + self.value + .attributes + .retain(|key, _| (&VALID_QUERY_ATTRS).contains(&key.as_str())); models::LogEvent { - name, level: "query".to_string(), - attributes: attrs, ..self.value } } + + #[inline(always)] + fn is_loggable_event(&self) -> bool { + self.settings.included_log_levels.contains(&self.value.level) + } } /// Capture provides mechanisms to transform a candidate into one of the enum variants. @@ -259,7 +205,6 @@ impl Candidate<'_, models::LogEvent> { enum Capture { Span(models::TraceSpan), LogEvent(models::LogEvent), - Multiple(Vec), Discarded, } @@ -274,11 +219,6 @@ impl Capture { Capture::LogEvent(log) => { logs.push(log); } - Capture::Multiple(captures) => { - for capture in captures { - capture.add_to(traces, logs); - } - } Capture::Discarded => {} } } @@ -288,8 +228,7 @@ impl Capture { /// be captrured as-is if its log level is among the levels to capture, or be discarded. impl From> for Capture { fn from(candidate: Candidate<'_, models::Event>) -> Capture { - if candidate.is_loggable_mongo_db_query() { - // mongo events representing queries are transformed into a more meaningful log event + if candidate.is_loggable_query_event() { Capture::LogEvent(candidate.query_event()) } else if candidate.is_loggable_event() { Capture::LogEvent(candidate.value) @@ -303,20 +242,10 @@ impl From> for Capture { /// is enabled; captured as-is, if tracing is enabled; or be discarded. impl From> for Capture { fn from(candidate: Candidate<'_, models::TraceSpan>) -> Capture { - let mut captures = vec![]; - - if candidate.is_loggable_quaint_query() { - captures.push(Capture::LogEvent(candidate.query_event())); - } - if candidate.settings.traces_enabled() { - captures.push(Capture::Span(candidate.value)); - } - - match captures.len() { - 0 => Capture::Discarded, - 1 => captures.pop().unwrap(), - _ => Capture::Multiple(captures), + Capture::Span(candidate.value) + } else { + Capture::Discarded } } } @@ -327,146 +256,6 @@ mod tests { use super::*; use std::time::Duration; - #[test] - fn test_candidate_event_transformation() { - let event = models::LogEvent { - span_id: Some("00f067aa0ba902b7".to_owned()), - name: "foo bar".to_owned(), - level: "debug".to_owned(), - timestamp: [101, 0], - attributes: vec![ - ( - "target".to_owned(), - serde_json::Value::String("mongodb_query_connector::query".to_owned()), - ), - ( - "query".to_owned(), - serde_json::Value::String("db.Users.find()".to_owned()), - ), - ("duration_ms".to_owned(), serde_json::json!(100.0)), - ] - .into_iter() - .collect(), - }; - - let only_query_log_events: Settings = "query".into(); - - let candidate = Candidate { - value: event.clone(), - settings: &only_query_log_events, - original_span_name: None, - }; - - let capture: Capture = candidate.into(); - match capture { - Capture::LogEvent(event) => { - assert_eq!(event.level, "query"); - assert_eq!(event.name.to_string().as_str(), "db.Users.find()"); - assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "100.0"); - } - _ => unreachable!(), - }; - - let event = models::LogEvent { - attributes: vec![( - "target".to_owned(), - serde_json::Value::String("a different one".to_owned()), - )] - .into_iter() - .collect(), - ..event - }; - let candidate = Candidate { - value: event.clone(), - settings: &only_query_log_events, - original_span_name: None, - }; - - let capture: Capture = candidate.into(); - match capture { - Capture::Discarded => {} - _ => unreachable!(), - } - } - - #[test] - fn test_candidate_span_transformation() { - let trace_span = models::TraceSpan { - trace_id: "4bf92f3577b34da6a3ce929d0e0e4736".to_owned(), - span_id: "00f067aa0ba902b7".to_owned(), - parent_span_id: "00f067aa0ba902b5".to_owned(), - name: "prisma:engine:db_query".to_ascii_lowercase(), - start_time: [101, 0], - end_time: [101, 10000000], - attributes: vec![( - "db.statement".to_owned(), - serde_json::Value::String("SELECT 1".to_owned()), - )] - .into_iter() - .collect(), - events: Default::default(), - links: Default::default(), - }; - - // capturing query events - let only_query_log_events: Settings = "query".into(); - let original_span_name = Some(Cow::Borrowed("quaint:query")); - - let candidate = Candidate { - value: trace_span.clone(), - settings: &only_query_log_events, - original_span_name: original_span_name.clone(), - }; - - let capture: Capture = candidate.into(); - match capture { - Capture::LogEvent(event) => { - assert_eq!(event.level, "query"); - assert_eq!(event.name.to_string().as_str(), "SELECT 1"); - assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "10.0"); - } - _ => unreachable!(), - }; - - // capturing query and tracing events - let query_logs_and_traces: Settings = "query, tracing".into(); - let candidate = Candidate { - value: trace_span.clone(), - settings: &query_logs_and_traces, - original_span_name: original_span_name.clone(), - }; - - let capture: Capture = candidate.into(); - match capture { - Capture::Multiple(captures) => { - match captures[0] { - Capture::LogEvent(_) => {} - _ => unreachable!(), - }; - - match captures[1] { - Capture::Span(_) => {} - _ => unreachable!(), - }; - } - _ => unreachable!(), - }; - - // capturing nothing - let reject_all: Settings = "".into(); - let candidate = Candidate { - value: trace_span.clone(), - settings: &reject_all, - original_span_name: original_span_name.clone(), - }; - - let capture: Capture = candidate.into(); - match capture { - Capture::Discarded => {} - _ => unreachable!(), - }; - } - #[tokio::test] async fn test_garbage_collection() { let exporter = Processor::new(); diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs index 4dd5a5fdc1f2..4ba7437738e4 100644 --- a/query-engine/core/src/telemetry/capturing/mod.rs +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -158,7 +158,6 @@ pub fn install_capturing_layer( let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "opentelemetry"); // set the provider as the global provider global::set_tracer_provider(provider); - // create a layer that will filter initial events and spans based on the log level configuration // from the environment and a specific filter to discard things that we are not interested in // from a capturiong perspective diff --git a/query-engine/core/src/telemetry/models.rs b/query-engine/core/src/telemetry/models.rs index 07cf755ad2e7..7f6650f78d69 100644 --- a/query-engine/core/src/telemetry/models.rs +++ b/query-engine/core/src/telemetry/models.rs @@ -112,6 +112,7 @@ impl From for TraceSpan { #[derive(Serialize, Debug, Clone, PartialEq, Eq)] pub struct Event { pub(super) span_id: Option, + #[serde(skip_serializing_if = "String::is_empty")] pub(super) name: String, pub(super) level: String, pub(super) timestamp: HrTime, From b9fb001dff19b6d992b369dc94a6f00870e3d1de Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Wed, 25 Jan 2023 18:17:11 +0100 Subject: [PATCH 05/12] Remove Generic candidates and captures --- .../core/src/telemetry/capturing/capturer.rs | 78 +++---------------- 1 file changed, 12 insertions(+), 66 deletions(-) diff --git a/query-engine/core/src/telemetry/capturing/capturer.rs b/query-engine/core/src/telemetry/capturing/capturer.rs index 52b4cbebb59a..5db1ecc6eee9 100644 --- a/query-engine/core/src/telemetry/capturing/capturer.rs +++ b/query-engine/core/src/telemetry/capturing/capturer.rs @@ -7,7 +7,6 @@ use opentelemetry::{ }, trace::{TraceId, TraceResult}, }; -use std::fmt; use std::{collections::HashMap, sync::Arc, sync::Mutex}; /// Capturer determines, based on a set of settings and a trace id, how capturing is going to be handled. @@ -127,22 +126,21 @@ impl SpanProcessor for Processor { let (events, span) = models::TraceSpan::from(span_data).split_events(); - let candidate_span = Candidate { - value: span, - settings: &settings, - }; - - let capture: Capture = candidate_span.into(); - capture.add_to(&mut storage.traces, &mut storage.logs); + if settings.traces_enabled() { + storage.traces.push(span); + } if storage.settings.logs_enabled() { events.into_iter().for_each(|log| { - let candidate_event = Candidate { + let candidate = Candidate { value: log, settings: &settings, }; - let capture: Capture = candidate_event.into(); - capture.add_to(&mut storage.traces, &mut storage.logs); + if candidate.is_loggable_query_event() { + storage.logs.push(candidate.query_event()) + } else if candidate.is_loggable_event() { + storage.logs.push(candidate.value) + } }); } } @@ -162,12 +160,12 @@ const VALID_QUERY_ATTRS: [&str; 3] = ["query", "params", "duration_ms"]; /// A Candidate represents either a span or an event that is being considered for capturing. /// A Candidate can be converted into a [`Capture`]. #[derive(Debug, Clone)] -struct Candidate<'batch_iter, T: Clone + fmt::Debug> { - value: T, +struct Candidate<'batch_iter> { + value: models::LogEvent, settings: &'batch_iter Settings, } -impl Candidate<'_, models::LogEvent> { +impl Candidate<'_> { #[inline(always)] fn is_loggable_query_event(&self) -> bool { if self.settings.included_log_levels.contains("query") { @@ -198,58 +196,6 @@ impl Candidate<'_, models::LogEvent> { } } -/// Capture provides mechanisms to transform a candidate into one of the enum variants. -/// This is necessary because a candidate span might also be transformed into a log event -/// (for quaint queries), or log events need to be transformed to a slightly different format -/// (for mongo queries). In addition some span and events are discarded. -enum Capture { - Span(models::TraceSpan), - LogEvent(models::LogEvent), - Discarded, -} - -impl Capture { - /// Add the capture to the traces and logs vectors. We pass the vectors in to allow for - /// a recursive implementation for the case of a candidate transforming into a Capture::Multiple - fn add_to(self, traces: &mut Vec, logs: &mut Vec) { - match self { - Capture::Span(span) => { - traces.push(span); - } - Capture::LogEvent(log) => { - logs.push(log); - } - Capture::Discarded => {} - } - } -} - -/// A Candidate Event can be transformed into either a slightly different LogEvent (for mongo queries) -/// be captrured as-is if its log level is among the levels to capture, or be discarded. -impl From> for Capture { - fn from(candidate: Candidate<'_, models::Event>) -> Capture { - if candidate.is_loggable_query_event() { - Capture::LogEvent(candidate.query_event()) - } else if candidate.is_loggable_event() { - Capture::LogEvent(candidate.value) - } else { - Capture::Discarded - } - } -} - -/// A Candidate TraceSpan can be transformed into a LogEvent (for quaint queries) if query logging -/// is enabled; captured as-is, if tracing is enabled; or be discarded. -impl From> for Capture { - fn from(candidate: Candidate<'_, models::TraceSpan>) -> Capture { - if candidate.settings.traces_enabled() { - Capture::Span(candidate.value) - } else { - Capture::Discarded - } - } -} - /// tests for capture exporter #[cfg(test)] mod tests { From 19095406b3dd7d28f5f98f849b6983f72c66a9bb Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Wed, 25 Jan 2023 18:48:02 +0100 Subject: [PATCH 06/12] Allow complex type in method signature --- query-engine/core/src/telemetry/capturing/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs index 4ba7437738e4..a2f174db7c1b 100644 --- a/query-engine/core/src/telemetry/capturing/mod.rs +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -143,6 +143,7 @@ pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer { /// Adds a capturing layer to the given subscriber and installs the transformed subscriber as the /// global, default subscriber +#[allow(clippy::type_complexity)] pub fn install_capturing_layer( subscriber: Layered, Layered + Send + Sync>, Registry>>, log_queries: bool, From 8307eb7d7413893e7a7766b1b596e373b23c4a82 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Thu, 26 Jan 2023 12:08:17 +0100 Subject: [PATCH 07/12] Filter in mongo db events --- query-engine/core/src/telemetry/capturing/helpers.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/query-engine/core/src/telemetry/capturing/helpers.rs b/query-engine/core/src/telemetry/capturing/helpers.rs index ac5fce77f652..967a1311bf1b 100644 --- a/query-engine/core/src/telemetry/capturing/helpers.rs +++ b/query-engine/core/src/telemetry/capturing/helpers.rs @@ -7,6 +7,6 @@ pub fn span_and_event_filter(meta: &Metadata) -> bool { return true; } - // relevant quaint connector spans and events - meta.target() == "quaint::connector::metrics" + // relevant quaint connector or mongodb connector spans and events + meta.target() == "quaint::connector::metrics" || meta.target() == "mongodb_query_connector::query" } From d2e2abecad16d1d55cbab751449f0c20bb6b431f Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Fri, 27 Jan 2023 16:09:51 +0100 Subject: [PATCH 08/12] Use specific fork of quaint until the related PR is merged https://github.com/prisma/quaint/pull/431 --- Cargo.lock | 64 ++++++++++++++++++++++++++++++++++++++++++++---------- Cargo.toml | 3 ++- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f66d9d155e9..f9adc7abf9f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1536,7 +1536,7 @@ dependencies = [ "migration-connector", "pretty_assertions", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde_json", "sql-introspection-connector", "sql-migration-connector", @@ -2015,7 +2015,7 @@ dependencies = [ "json-rpc-stdio", "migration-connector", "migration-core", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde_json", "structopt", "tempfile", @@ -2046,7 +2046,7 @@ dependencies = [ "pretty_assertions", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde", "serde_json", "sql-migration-connector", @@ -3151,12 +3151,52 @@ dependencies = [ "mongodb-client", "parking_lot 0.12.1", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "tempfile", "test-setup", "url", ] +[[package]] +name = "quaint" +version = "0.2.0-alpha.13" +source = "git+https://github.com/miguelff/quaint?branch=fix-traces#76a56f5940bd44d9f51cd73b89b274c65ff2a081" +dependencies = [ + "async-trait", + "base64 0.12.3", + "bigdecimal", + "bit-vec", + "byteorder", + "bytes", + "chrono", + "connection-string", + "either", + "futures", + "hex", + "libsqlite3-sys", + "lru-cache", + "metrics 0.18.1", + "mobc", + "mysql_async", + "native-tls", + "num_cpus", + "percent-encoding", + "postgres-native-tls", + "postgres-types", + "rusqlite", + "serde_json", + "sqlformat", + "thiserror", + "tiberius", + "tokio", + "tokio-postgres", + "tokio-util 0.6.10", + "tracing", + "tracing-core", + "url", + "uuid 1.1.2", +] + [[package]] name = "quaint" version = "0.2.0-alpha.13" @@ -3292,7 +3332,7 @@ dependencies = [ "opentelemetry-otlp", "prisma-models", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "query-connector", "query-core", "query-engine-metrics", @@ -3416,7 +3456,7 @@ dependencies = [ "prisma-models", "psl", "qe-setup", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/prisma/quaint)", "query-core", "query-engine", "query-engine-metrics", @@ -4096,7 +4136,7 @@ dependencies = [ "pretty_assertions", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "regex", "serde", "serde_json", @@ -4119,7 +4159,7 @@ dependencies = [ "migration-connector", "once_cell", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "regex", "serde_json", "sql-ddl", @@ -4149,7 +4189,7 @@ dependencies = [ "prisma-models", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "query-connector", "rand 0.7.3", "serde", @@ -4178,7 +4218,7 @@ dependencies = [ "pretty_assertions", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "regex", "serde", "test-macros", @@ -4362,7 +4402,7 @@ dependencies = [ "dissimilar", "enumflags2", "once_cell", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "tokio", "tracing", "tracing-error", @@ -4998,7 +5038,7 @@ version = "0.1.0" dependencies = [ "backtrace", "indoc", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde", "serde_json", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 76806c557ad7..5be08ed883e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,8 @@ user-facing-errors = { path = "./libs/user-facing-errors" } uuid = { version = "1", features = ["serde"] } [workspace.dependencies.quaint] -git = "https://github.com/prisma/quaint" +git = "https://github.com/miguelff/quaint" +branch = "fix-traces" features = [ "bigdecimal", "chrono", From a457f104157bb33287ea3c0ac248ace542151c35 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Fri, 27 Jan 2023 16:40:53 +0100 Subject: [PATCH 09/12] Refactor make trace conversion more explicit And move them to the telemetry package as an extension trait --- .../core/src/interactive_transactions/mod.rs | 86 +----------------- .../core/src/telemetry/capturing/mod.rs | 2 + .../core/src/telemetry/capturing/tx_ext.rs | 88 +++++++++++++++++++ query-engine/query-engine/src/server/mod.rs | 5 +- 4 files changed, 94 insertions(+), 87 deletions(-) create mode 100644 query-engine/core/src/telemetry/capturing/tx_ext.rs diff --git a/query-engine/core/src/interactive_transactions/mod.rs b/query-engine/core/src/interactive_transactions/mod.rs index bdf11dc1e2b6..def10bfc10ef 100644 --- a/query-engine/core/src/interactive_transactions/mod.rs +++ b/query-engine/core/src/interactive_transactions/mod.rs @@ -1,6 +1,6 @@ use crate::CoreError; use connector::{Connection, ConnectionLike, Transaction}; -use std::{collections::HashMap, fmt::Display}; +use std::fmt::Display; use tokio::{ task::JoinHandle, time::{Duration, Instant}, @@ -76,63 +76,6 @@ impl Display for TxId { } } -impl From for opentelemetry::Context { - // This is a bit of a hack, but it's the only way to have a default trace span for a whole - // transaction when no traceparent is propagated from the client. - // - // This is done so we can capture traces happening accross the different queries in a - // transaction. Otherwise, if a traceparent is not propagated from the client, each query in - // the transaction will run within a span that has already been generated at the begining of the - // transaction, and held active in the actor in charge of running the queries. Thus, making - // impossible to capture traces happening in the individual queries, as they won't be aware of - // the transaction they are part of. - // - // By generating this "fake" traceparent based on the transaction id, we can have a common - // trace_id for all transaction operations. - fn from(id: TxId) -> Self { - let extractor: HashMap = - HashMap::from_iter(vec![("traceparent".to_string(), id.as_traceparent())]); - opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) - } -} - -impl TxId { - pub fn as_traceparent(&self) -> String { - let trace_id = opentelemetry::trace::TraceId::from(self.clone()); - format!("00-{}-0000000000000001-01", trace_id) - } -} - -impl From for opentelemetry::trace::TraceId { - // in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte, - // (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id. - // this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa" - // - // - first letter is always the same - // - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes - // - next 4 bytes are a counter since the server started - // - next 4 bytes are a system fingerprint, invariant for the same server instance - // - least significative 8 bytes. Totally random. - // - // We want the most entropic slice of 16 bytes that's deterministicly determined - fn from(id: TxId) -> Self { - let mut buffer = [0; 16]; - let tx_id_bytes = id.0.as_bytes(); - let len = tx_id_bytes.len(); - - // bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter - for (i, source_idx) in (len - 20..len - 12).enumerate() { - buffer[i] = tx_id_bytes[source_idx]; - } - // bytes [len-8 to len): the random blocks - for (i, source_idx) in (len - 8..len).enumerate() { - buffer[i + 8] = tx_id_bytes[source_idx]; - } - - opentelemetry::trace::TraceId::from_bytes(buffer) - } -} - pub enum CachedTx { Open(OpenTx), Committed, @@ -209,30 +152,3 @@ pub enum ClosedTx { RolledBack, Expired { start_time: Instant, timeout: Duration }, } - -// tests for txid into traits -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_txid_into_traceid() { - let fixture = vec![ - ("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"), - // counter changed, trace id changed: - ("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"), - // fingerprint changed, trace id did not change, as that chunk is ignored: - ("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), - // first 5 bytes changed, trace id did not change, as that chunk is ignored: - ("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), - // 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp - ("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"), - ]; - - for (txid, expected_trace_id) in fixture { - let txid = TxId(txid.to_string()); - let trace_id: opentelemetry::trace::TraceId = txid.into(); - assert_eq!(trace_id.to_string(), expected_trace_id); - } - } -} diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs index a2f174db7c1b..82171f3156ee 100644 --- a/query-engine/core/src/telemetry/capturing/mod.rs +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -123,6 +123,7 @@ //! pub use self::capturer::Capturer; pub use self::settings::Settings; +pub use tx_ext::TxTraceExt; use self::capturer::Processor; use once_cell::sync::Lazy; @@ -179,3 +180,4 @@ mod capturer; mod helpers; mod settings; pub mod storage; +mod tx_ext; diff --git a/query-engine/core/src/telemetry/capturing/tx_ext.rs b/query-engine/core/src/telemetry/capturing/tx_ext.rs new file mode 100644 index 000000000000..5c0203613fe6 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/tx_ext.rs @@ -0,0 +1,88 @@ +use std::collections::HashMap; + +pub trait TxTraceExt { + fn into_trace_id(self) -> opentelemetry::trace::TraceId; + fn into_trace_context(self) -> opentelemetry::Context; + fn as_traceparent(&self) -> String; +} + +impl TxTraceExt for crate::TxId { + // in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte, + // (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id. + // this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa" + // + // - first letter is always the same + // - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes + // - next 4 bytes are a counter since the server started + // - next 4 bytes are a system fingerprint, invariant for the same server instance + // - least significative 8 bytes. Totally random. + // + // We want the most entropic slice of 16 bytes that's deterministicly determined + fn into_trace_id(self) -> opentelemetry::trace::TraceId { + let mut buffer = [0; 16]; + let str = self.to_string(); + let tx_id_bytes = str.as_bytes(); + let len = tx_id_bytes.len(); + + // bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter + for (i, source_idx) in (len - 20..len - 12).enumerate() { + buffer[i] = tx_id_bytes[source_idx]; + } + // bytes [len-8 to len): the random blocks + for (i, source_idx) in (len - 8..len).enumerate() { + buffer[i + 8] = tx_id_bytes[source_idx]; + } + + opentelemetry::trace::TraceId::from_bytes(buffer) + } + // This is a bit of a hack, but it's the only way to have a default trace span for a whole + // transaction when no traceparent is propagated from the client. + // + // This is done so we can capture traces happening accross the different queries in a + // transaction. Otherwise, if a traceparent is not propagated from the client, each query in + // the transaction will run within a span that has already been generated at the begining of the + // transaction, and held active in the actor in charge of running the queries. Thus, making + // impossible to capture traces happening in the individual queries, as they won't be aware of + // the transaction they are part of. + // + // By generating this "fake" traceparent based on the transaction id, we can have a common + // trace_id for all transaction operations. + fn into_trace_context(self) -> opentelemetry::Context { + let extractor: HashMap = + HashMap::from_iter(vec![("traceparent".to_string(), self.as_traceparent())]); + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) + } + + fn as_traceparent(&self) -> String { + let trace_id = self.clone().into_trace_id(); + format!("00-{}-0000000000000001-01", trace_id) + } +} + +// tests for txid into traits +#[cfg(test)] +mod test { + use super::*; + use crate::TxId; + + #[test] + fn test_txid_into_traceid() { + let fixture = vec![ + ("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"), + // counter changed, trace id changed: + ("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // fingerprint changed, trace id did not change, as that chunk is ignored: + ("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // first 5 bytes changed, trace id did not change, as that chunk is ignored: + ("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp + ("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"), + ]; + + for (txid, expected_trace_id) in fixture { + let txid: TxId = txid.into(); + let trace_id: opentelemetry::trace::TraceId = txid.into_trace_id(); + assert_eq!(trace_id.to_string(), expected_trace_id); + } + } +} diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index cd4554c43f42..0269c6238075 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -5,6 +5,7 @@ use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Se use opentelemetry::trace::TraceContextExt; use opentelemetry::{global, propagation::Extractor}; use query_core::helpers::*; +use query_core::telemetry::capturing::TxTraceExt; use query_core::{ schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId, }; @@ -156,7 +157,7 @@ async fn graphql_handler(state: State, req: Request) -> Result) -> Result Date: Fri, 27 Jan 2023 16:43:57 +0100 Subject: [PATCH 10/12] Cosmetic changes --- .../core/src/telemetry/capturing/capturer.rs | 4 +-- .../core/src/telemetry/capturing/settings.rs | 29 +++++++++---------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/query-engine/core/src/telemetry/capturing/capturer.rs b/query-engine/core/src/telemetry/capturing/capturer.rs index 5db1ecc6eee9..3c9d15b23db6 100644 --- a/query-engine/core/src/telemetry/capturing/capturer.rs +++ b/query-engine/core/src/telemetry/capturing/capturer.rs @@ -65,7 +65,7 @@ impl Processor { } } - pub(self) async fn start_capturing(&self, trace_id: TraceId, settings: Settings) { + async fn start_capturing(&self, trace_id: TraceId, settings: Settings) { let mut locked_storage = self.storage.lock().unwrap(); locked_storage.insert(trace_id, settings.clone().into()); drop(locked_storage); @@ -81,7 +81,7 @@ impl Processor { }); } - pub(self) async fn fetch_captures(&self, trace_id: TraceId) -> Option { + async fn fetch_captures(&self, trace_id: TraceId) -> Option { let mut traces = self.storage.lock().unwrap(); traces.remove(&trace_id) diff --git a/query-engine/core/src/telemetry/capturing/settings.rs b/query-engine/core/src/telemetry/capturing/settings.rs index a039212b96d1..771586940fc5 100644 --- a/query-engine/core/src/telemetry/capturing/settings.rs +++ b/query-engine/core/src/telemetry/capturing/settings.rs @@ -5,14 +5,14 @@ static DEFAULT_TTL: time::Duration = time::Duration::from_secs(1800); // 30 minu #[derive(Debug, Clone, Default)] pub struct Settings { - // only capture log events from the specified log levels, the special level "query", which does not - // exist in the engine logging infrastructure, is shimed from any event describing a query, regardless - // of its level. + /// only capture log events from the specified log levels, the special level "query", which does not + /// exist in the engine logging infrastructure, is shimed from any event describing a query, regardless + /// of its level. pub(super) included_log_levels: HashSet, - // whether to include trace spans when capturing + /// whether to include trace spans when capturing pub(super) include_traces: bool, - // how long to keep captured traces in memory, if by any chance the capturing breaks, a tokio task - // will clean captured traces after this duration. Defaults to [`DEFAULT_TTL`]. + /// how long to keep captured traces in memory, if by any chance the capturing breaks, a tokio task + /// will clean captured traces after this duration. Defaults to [`DEFAULT_TTL`]. pub(super) ttl: time::Duration, } @@ -26,13 +26,13 @@ impl Settings { } } -// As the test below shows, settings can be constructed froma comma separated string. -// Examples: valid: `"error, warn, query, tracing"` invalid: "foo, bar baz". strings corresponding -// passed in are trimmed and converted to lowercase. chunks corresponding to levels different from -// those in VALID_LEVELS are ignored. -// -// The ttl is always the same (DEFAULT_TTL) but is there to allow for easier unit-testing of c -// apturing logic +/// As the test below shows, settings can be constructed from a comma separated string. +/// Examples: valid: `"error, warn, query, tracing"` invalid: "foo, bar baz". strings corresponding +/// passed in are trimmed and converted to lowercase. chunks corresponding to levels different from +/// those in VALID_LEVELS are ignored. +/// +/// The ttl is always the same (DEFAULT_TTL) but is there to allow for easier unit-testing of c +/// apturing logic impl From<&str> for Settings { fn from(s: &str) -> Self { let chunks = s.split(','); @@ -52,17 +52,14 @@ impl From<&str> for Settings { } impl Settings { - #[inline(always)] pub fn is_enabled(&self) -> bool { self.traces_enabled() || self.logs_enabled() } - #[inline(always)] pub fn traces_enabled(&self) -> bool { self.include_traces } - #[inline(always)] pub fn logs_enabled(&self) -> bool { !self.included_log_levels.is_empty() } From 3aa0d7b5675f501f97951f338fcfc5078dde294e Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Fri, 27 Jan 2023 16:53:23 +0100 Subject: [PATCH 11/12] Export target param --- query-engine/core/src/telemetry/capturing/capturer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query-engine/core/src/telemetry/capturing/capturer.rs b/query-engine/core/src/telemetry/capturing/capturer.rs index 3c9d15b23db6..1b36626cd1fd 100644 --- a/query-engine/core/src/telemetry/capturing/capturer.rs +++ b/query-engine/core/src/telemetry/capturing/capturer.rs @@ -156,7 +156,7 @@ impl SpanProcessor for Processor { Ok(()) } } -const VALID_QUERY_ATTRS: [&str; 3] = ["query", "params", "duration_ms"]; +const VALID_QUERY_ATTRS: [&str; 4] = ["query", "params", "target", "duration_ms"]; /// A Candidate represents either a span or an event that is being considered for capturing. /// A Candidate can be converted into a [`Capture`]. #[derive(Debug, Clone)] From fc1227cd91ecb8970b2c9c1385e018cc4a6fe514 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Mon, 30 Jan 2023 12:10:05 +0100 Subject: [PATCH 12/12] More clippy fixes --- query-engine/core/src/telemetry/capturing/tx_ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query-engine/core/src/telemetry/capturing/tx_ext.rs b/query-engine/core/src/telemetry/capturing/tx_ext.rs index 5c0203613fe6..6b1b4905ab57 100644 --- a/query-engine/core/src/telemetry/capturing/tx_ext.rs +++ b/query-engine/core/src/telemetry/capturing/tx_ext.rs @@ -55,7 +55,7 @@ impl TxTraceExt for crate::TxId { fn as_traceparent(&self) -> String { let trace_id = self.clone().into_trace_id(); - format!("00-{}-0000000000000001-01", trace_id) + format!("00-{trace_id}-0000000000000001-01") } }