From 0d8abcc2e8e5a8fddf8e0cfe8fa5d0e12d1d9b46 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Fri, 20 Jan 2023 11:48:59 +0100 Subject: [PATCH] Implement log/tracing capturing According to https://www.notion.so/prismaio/Data-Proxy-Logs-Scale-Team-Spec-184242eb7271476d82113c59c761bed9#4ec77c2aa9004015ac2d392c7458dfb5 --- Cargo.lock | 1 + Makefile | 2 +- .../query-tests-setup/src/runner/binary.rs | 8 +- query-engine/core/Cargo.toml | 6 +- .../src/interactive_transactions/error.rs | 29 ++ .../core/src/interactive_transactions/mod.rs | 86 ++- query-engine/core/src/lib.rs | 2 +- .../core/src/telemetry/capturing/capturer.rs | 490 ++++++++++++++++++ .../core/src/telemetry/capturing/mod.rs | 160 ++++++ .../core/src/telemetry/capturing/settings.rs | 88 ++++ .../core/src/telemetry/capturing/storage.rs | 19 + query-engine/core/src/telemetry/helpers.rs | 9 + query-engine/core/src/telemetry/mod.rs | 1 + query-engine/query-engine/src/logger.rs | 26 +- query-engine/query-engine/src/opt.rs | 4 + query-engine/query-engine/src/server/mod.rs | 210 +++++++- query-engine/query-engine/src/state.rs | 6 +- query-engine/query-engine/src/tests/dmmf.rs | 1 + .../request-handlers/src/graphql/response.rs | 14 + query-engine/request-handlers/src/lib.rs | 9 + 20 files changed, 1135 insertions(+), 36 deletions(-) create mode 100644 query-engine/core/src/telemetry/capturing/capturer.rs create mode 100644 query-engine/core/src/telemetry/capturing/mod.rs create mode 100644 query-engine/core/src/telemetry/capturing/settings.rs create mode 100644 query-engine/core/src/telemetry/capturing/storage.rs diff --git a/Cargo.lock b/Cargo.lock index 51a4c8c19a74..d63a233081fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2585,6 +2585,7 @@ dependencies = [ "percent-encoding", "pin-project", "rand 0.8.5", + "serde", "thiserror", "tokio", "tokio-stream", diff --git a/Makefile b/Makefile index c0f170ca940a..2c0ea71694b4 100644 --- a/Makefile +++ b/Makefile @@ -214,7 +214,7 @@ validate: cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma qe: - cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry + cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-telemetry-in-response qe-dmmf: cargo run --bin query-engine -- cli dmmf > dmmf.json diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs index faad759a589a..ee445d64d65d 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs @@ -20,7 +20,13 @@ pub struct BinaryRunner { #[async_trait::async_trait] impl RunnerInterface for BinaryRunner { async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult { - let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]); + let opts = PrismaOpt::from_list(&[ + "binary", + "--enable-raw-queries", + "--enable-telemetry-in-response", + "--datamodel", + &datamodel, + ]); let state = setup(&opts, false, Some(metrics)).await.unwrap(); let configuration = opts.configuration(true).unwrap(); diff --git a/query-engine/core/Cargo.toml b/query-engine/core/Cargo.toml index b676e948fc79..55ef8e3a5aa4 100644 --- a/query-engine/core/Cargo.toml +++ b/query-engine/core/Cargo.toml @@ -23,11 +23,11 @@ futures = "0.3" indexmap = { version = "1.7", features = ["serde-1"] } itertools = "0.10" mongodb-connector = { path = "../connectors/mongodb-query-connector", package = "mongodb-query-connector", optional = true } -once_cell = "1.3" +once_cell = "1" petgraph = "0.4" prisma-models = { path = "../prisma-models" } prisma-value = { path = "../../libs/prisma-value" } -opentelemetry = { version = "0.17"} +opentelemetry = { version = "0.17.0", features = ["rt-tokio", "serialize"] } query-engine-metrics = {path = "../metrics"} serde.workspace = true serde_json.workspace = true @@ -36,7 +36,7 @@ thiserror = "1.0" tokio.workspace = true tracing = { version = "0.1", features = ["attributes"] } tracing-futures = "0.2" -tracing-subscriber = {version = "0.3", features = ["env-filter"]} +tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-opentelemetry = "0.17.4" url = "2" user-facing-errors = { path = "../../libs/user-facing-errors" } diff --git a/query-engine/core/src/interactive_transactions/error.rs b/query-engine/core/src/interactive_transactions/error.rs index 146d69f103b5..8189e2ce7420 100644 --- a/query-engine/core/src/interactive_transactions/error.rs +++ b/query-engine/core/src/interactive_transactions/error.rs @@ -1,5 +1,10 @@ use thiserror::Error; +use crate::{ + response_ir::{Item, Map}, + CoreError, +}; + #[derive(Debug, Error, PartialEq)] pub enum TransactionError { #[error("Unable to start a transaction in the given time.")] @@ -17,3 +22,27 @@ pub enum TransactionError { #[error("Unexpected response: {reason}.")] Unknown { reason: String }, } + +#[derive(Debug, serde::Serialize, PartialEq)] +pub struct ExtendedTransactionUserFacingError { + #[serde(flatten)] + user_facing_error: user_facing_errors::Error, + + #[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")] + extensions: Map, +} + +impl ExtendedTransactionUserFacingError { + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } +} + +impl From for ExtendedTransactionUserFacingError { + fn from(error: CoreError) -> Self { + ExtendedTransactionUserFacingError { + user_facing_error: error.into(), + extensions: Default::default(), + } + } +} diff --git a/query-engine/core/src/interactive_transactions/mod.rs b/query-engine/core/src/interactive_transactions/mod.rs index d3c73f705123..7df5a2c33547 100644 --- a/query-engine/core/src/interactive_transactions/mod.rs +++ b/query-engine/core/src/interactive_transactions/mod.rs @@ -1,6 +1,6 @@ use crate::CoreError; use connector::{Connection, ConnectionLike, Transaction}; -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; use tokio::{ task::JoinHandle, time::{Duration, Instant}, @@ -76,6 +76,63 @@ impl Display for TxId { } } +impl From for opentelemetry::Context { + // This is a bit of a hack, but it's the only way to have a default trace span for a whole + // transaction when no traceparent is propagated from the client. + // + // This is done so we can capture traces happening accross the different queries in a + // transaction. Otherwise, if a traceparent is not propagated from the client, each query in + // the transaction will run within a span that has already been generated at the begining of the + // transaction, and held active in the actor in charge of running the queries. Thus, making + // impossible to capture traces happening in the individual queries, as they won't be aware of + // the transaction they are part of. + // + // By generating this "fake" traceparent based on the transaction id, we can have a common + // trace_id for all transaction operations. + fn from(id: TxId) -> Self { + let extractor: HashMap = + HashMap::from_iter(vec![("traceparent".to_string(), id.as_traceparent())]); + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) + } +} + +impl TxId { + pub fn as_traceparent(&self) -> String { + let trace_id = opentelemetry::trace::TraceId::from(self.clone()); + format!("00-{}-0000000000000001-01", trace_id) + } +} + +impl From for opentelemetry::trace::TraceId { + // in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte, + // (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id. + // this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa" + // + // - first letter is always the same + // - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes + // - next 4 bytes are a counter since the server started + // - next 4 bytes are a system fingerprint, invariant for the same server instance + // - least significative 8 bytes. Totally random. + // + // We want the most entropic slice of 16 bytes that's deterministicly determined + fn from(id: TxId) -> Self { + let mut buffer = [0; 16]; + let tx_id_bytes = id.0.as_bytes(); + let len = tx_id_bytes.len(); + + // bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter + for (i, source_idx) in (len - 20..len - 12).enumerate() { + buffer[i] = tx_id_bytes[source_idx]; + } + // bytes [len-8 to len): the random blocks + for (i, source_idx) in (len - 8..len).enumerate() { + buffer[i + 8] = tx_id_bytes[source_idx]; + } + + opentelemetry::trace::TraceId::from_bytes(buffer) + } +} + pub enum CachedTx { Open(OpenTx), Committed, @@ -152,3 +209,30 @@ pub enum ClosedTx { RolledBack, Expired { start_time: Instant, timeout: Duration }, } + +// tests for txid into traits +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_txid_into_traceid() { + let fixture = vec![ + ("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"), + // counter changed, trace id changed: + ("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // fingerprint changed, trace id did not change, as that chunk is ignored: + ("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // first 5 bytes changed, trace id did not change, as that chunk is ignored: + ("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp + ("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"), + ]; + + for (txid, expected_trace_id) in fixture { + let txid = TxId(txid.to_string()); + let trace_id: opentelemetry::trace::TraceId = txid.into(); + assert_eq!(trace_id.to_string(), expected_trace_id); + } + } +} diff --git a/query-engine/core/src/lib.rs b/query-engine/core/src/lib.rs index 3bf22ee2cfde..1645fa13e16f 100644 --- a/query-engine/core/src/lib.rs +++ b/query-engine/core/src/lib.rs @@ -16,7 +16,7 @@ pub mod telemetry; pub use self::{ error::{CoreError, FieldConversionError}, executor::{QueryExecutor, TransactionOptions}, - interactive_transactions::{TransactionError, TxId}, + interactive_transactions::{ExtendedTransactionUserFacingError, TransactionError, TxId}, query_document::*, telemetry::*, }; diff --git a/query-engine/core/src/telemetry/capturing/capturer.rs b/query-engine/core/src/telemetry/capturing/capturer.rs new file mode 100644 index 000000000000..01dba750b50a --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/capturer.rs @@ -0,0 +1,490 @@ +use super::{settings::Settings, storage::Storage}; +use crate::models; +use opentelemetry::{ + sdk::{ + export::trace::SpanData, + trace::{Span, SpanProcessor}, + }, + trace::{TraceId, TraceResult}, +}; +use std::{borrow::Cow, fmt}; +use std::{collections::HashMap, sync::Arc, sync::Mutex}; + +/// Capturer determines, based on a set of settings and a trace id, how capturing is going to be handled. +/// Generally, both the trace id and the settings will be derived from request headers. Thus, a new +/// value of this enum is created per request. +#[derive(Debug, Clone)] +pub enum Capturer { + Enabled(Inner), + Disabled, +} + +impl Capturer { + pub(super) fn new(processor: Processor, trace_id: TraceId, settings: Settings) -> Self { + if settings.is_enabled() { + return Self::Enabled(Inner { + processor, + trace_id, + settings, + }); + } + + Self::Disabled + } +} + +#[derive(Debug, Clone)] +pub struct Inner { + pub(super) processor: Processor, + pub(super) trace_id: TraceId, + pub(super) settings: Settings, +} + +impl Inner { + pub async fn start_capturing(&self) { + self.processor + .start_capturing(self.trace_id, self.settings.clone()) + .await + } + + pub async fn fetch_captures(&self) -> Option { + self.processor.fetch_captures(self.trace_id).await + } +} + +/// A [`SpanProcessor`] that captures and stores spans in memory in a synchronized dictionary for +/// later retrieval +#[derive(Debug, Clone)] +pub struct Processor { + pub(crate) storage: Arc>>, +} + +impl Processor { + pub fn new() -> Self { + Self { + storage: Default::default(), + } + } + + pub(self) async fn start_capturing(&self, trace_id: TraceId, settings: Settings) { + let mut locked_storage = self.storage.lock().unwrap(); + locked_storage.insert(trace_id, settings.clone().into()); + drop(locked_storage); + + let ttl = settings.ttl; + let storage = self.storage.clone(); + tokio::spawn(async move { + tokio::time::sleep(ttl).await; + let mut locked_traces = storage.lock().unwrap(); + if locked_traces.remove(&trace_id).is_some() { + warn!("Timeout waiting for telemetry to be captured. trace_id={}", trace_id) + } + }); + } + + pub(self) async fn fetch_captures(&self, trace_id: TraceId) -> Option { + let mut traces = self.storage.lock().unwrap(); + + traces.remove(&trace_id) + } +} + +impl Default for Processor { + fn default() -> Self { + Self::new() + } +} + +impl SpanProcessor for Processor { + fn on_start(&self, _: &mut Span, _: &opentelemetry::Context) { + // no-op + } + + /// Exports a spancontaining zero or more events that might represent + /// logs in the prisma client logging categories of logs (query, info, warn, error) + /// + /// There's an impedance between the client categories of logs and the server (standard) + /// hierarchical levels of logs (trace, debug, info, warn, error). + /// + /// The most prominent difference is the "query" type of events. In the client these model + /// database queries made by the engine through a connector. But ATM there's not a 1:1 mapping + /// between the client "query" level and one of the server levels. And depending on the database + /// mongo / relational, the information to build this kind of log event is logged diffeerently in + /// the server. + /// + /// In the case of the of relational databaes --queried through sql_query_connector and eventually + /// through quaint, a trace span describes the query-- `TraceSpan::represents_query_event` + /// determines if a span represents a query event. + /// + /// In the case of mongo, an event represents the query, but it needs to be transformed before + /// capturing it. `Event::query_event` does that. + fn on_end(&self, span_data: SpanData) { + let trace_id = span_data.span_context.trace_id(); + + let mut locked_storage = self.storage.lock().unwrap(); + if let Some(storage) = locked_storage.get_mut(&trace_id) { + let settings = storage.settings.clone(); + let original_span_name = span_data.name.clone(); + + let (events, span) = models::TraceSpan::from(span_data).split_events(); + + let candidate_span = Candidate { + value: span, + settings: &settings, + original_span_name: Some(original_span_name), + }; + + let capture: Capture = candidate_span.into(); + capture.add_to(&mut storage.traces, &mut storage.logs); + + if storage.settings.logs_enabled() { + events.into_iter().for_each(|log| { + let candidate_event = Candidate { + value: log, + settings: &settings, + original_span_name: None, + }; + let capture: Capture = candidate_event.into(); + capture.add_to(&mut storage.traces, &mut storage.logs); + }); + } + } + } + + fn force_flush(&self) -> TraceResult<()> { + // no-op + Ok(()) + } + + fn shutdown(&mut self) -> TraceResult<()> { + // no-op + Ok(()) + } +} + +/// A Candidate represents either a span or an event that is being considered for capturing. +/// A Candidate can be converted into a [`Capture`]. +#[derive(Debug, Clone)] +struct Candidate<'batch_iter, T: Clone + fmt::Debug> { + value: T, + settings: &'batch_iter Settings, + original_span_name: Option>, +} + +impl Candidate<'_, models::TraceSpan> { + #[inline(always)] + fn is_loggable_quaint_query(&self) -> bool { + self.settings.included_log_levels.contains("query") + && self.original_span_name.is_some() + && matches!(self.original_span_name, Some(Cow::Borrowed("quaint:query"))) + } + + fn query_event(&self) -> models::Event { + let span = &self.value; + + let duration_ms = ((span.end_time[0] as f64 - span.start_time[0] as f64) * 1_000.0) + + ((span.end_time[1] as f64 - span.start_time[1] as f64) / 1_000_000.0); + + let statement = if let Some(q) = span.attributes.get("db.statement") { + match q { + serde_json::Value::String(s) => s.to_string(), + _ => "unknown".to_string(), + } + } else { + "unknown".to_string() + }; + + let attributes = vec![( + "duration_ms".to_owned(), + serde_json::Value::Number(serde_json::Number::from_f64(duration_ms).unwrap()), + )] + .into_iter() + .collect(); + + models::Event { + span_id: Some(span.span_id.to_owned()), + name: statement, + level: "query".to_string(), + timestamp: span.start_time, + attributes, + } + } +} + +impl Candidate<'_, models::LogEvent> { + #[inline(always)] + fn is_loggable_mongo_db_query(&self) -> bool { + self.settings.included_log_levels.contains("query") && { + if let Some(target) = self.value.attributes.get("target") { + if let Some(val) = target.as_str() { + return val == "mongodb_query_connector::query"; + } + } + false + } + } + + #[inline(always)] + fn is_loggable_event(&self) -> bool { + self.settings.included_log_levels.contains(&self.value.level) + } + + fn query_event(self) -> models::LogEvent { + let mut attributes = self.value.attributes; + let mut attrs = HashMap::new(); + if let Some(dur) = attributes.get("duration_ms") { + attrs.insert("duration_ms".to_owned(), dur.clone()); + } + + let mut name = "uknown".to_owned(); + if let Some(query) = attributes.remove("query") { + if let Some(str) = query.as_str() { + name = str.to_owned(); + } + } + + models::LogEvent { + name, + level: "query".to_string(), + attributes: attrs, + ..self.value + } + } +} + +/// Capture provides mechanisms to transform a candidate into one of the enum variants. +/// This is necessary because a candidate span might also be transformed into a log event +/// (for quaint queries), or log events need to be transformed to a slightly different format +/// (for mongo queries). In addition some span and events are discarded. +enum Capture { + Span(models::TraceSpan), + LogEvent(models::LogEvent), + Multiple(Vec), + Discarded, +} + +impl Capture { + /// Add the capture to the traces and logs vectors. We pass the vectors in to allow for + /// a recursive implementation for the case of a candidate transforming into a Capture::Multiple + fn add_to(self, traces: &mut Vec, logs: &mut Vec) { + match self { + Capture::Span(span) => { + traces.push(span); + } + Capture::LogEvent(log) => { + logs.push(log); + } + Capture::Multiple(captures) => { + for capture in captures { + capture.add_to(traces, logs); + } + } + Capture::Discarded => {} + } + } +} + +/// A Candidate Event can be transformed into either a slightly different LogEvent (for mongo queries) +/// be captrured as-is if its log level is among the levels to capture, or be discarded. +impl From> for Capture { + fn from(candidate: Candidate<'_, models::Event>) -> Capture { + if candidate.is_loggable_mongo_db_query() { + // mongo events representing queries are transformed into a more meaningful log event + Capture::LogEvent(candidate.query_event()) + } else if candidate.is_loggable_event() { + Capture::LogEvent(candidate.value) + } else { + Capture::Discarded + } + } +} + +/// A Candidate TraceSpan can be transformed into a LogEvent (for quaint queries) if query logging +/// is enabled; captured as-is, if tracing is enabled; or be discarded. +impl From> for Capture { + fn from(candidate: Candidate<'_, models::TraceSpan>) -> Capture { + let mut captures = vec![]; + + if candidate.is_loggable_quaint_query() { + captures.push(Capture::LogEvent(candidate.query_event())); + } + + if candidate.settings.traces_enabled() { + captures.push(Capture::Span(candidate.value)); + } + + match captures.len() { + 0 => Capture::Discarded, + 1 => captures.pop().unwrap(), + _ => Capture::Multiple(captures), + } + } +} + +/// tests for capture exporter +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_candidate_event_transformation() { + let event = models::LogEvent { + span_id: Some("00f067aa0ba902b7".to_owned()), + name: "foo bar".to_owned(), + level: "debug".to_owned(), + timestamp: [101, 0], + attributes: vec![ + ( + "target".to_owned(), + serde_json::Value::String("mongodb_query_connector::query".to_owned()), + ), + ( + "query".to_owned(), + serde_json::Value::String("db.Users.find()".to_owned()), + ), + ("duration_ms".to_owned(), serde_json::json!(100.0)), + ] + .into_iter() + .collect(), + }; + + let only_query_log_events: Settings = "query".into(); + + let candidate = Candidate { + value: event.clone(), + settings: &only_query_log_events, + original_span_name: None, + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::LogEvent(event) => { + assert_eq!(event.level, "query"); + assert_eq!(event.name.to_string().as_str(), "db.Users.find()"); + assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "100.0"); + } + _ => unreachable!(), + }; + + let event = models::LogEvent { + attributes: vec![( + "target".to_owned(), + serde_json::Value::String("a different one".to_owned()), + )] + .into_iter() + .collect(), + ..event + }; + let candidate = Candidate { + value: event.clone(), + settings: &only_query_log_events, + original_span_name: None, + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::Discarded => {} + _ => unreachable!(), + } + } + + #[test] + fn test_candidate_span_transformation() { + let trace_span = models::TraceSpan { + trace_id: "4bf92f3577b34da6a3ce929d0e0e4736".to_owned(), + span_id: "00f067aa0ba902b7".to_owned(), + parent_span_id: "00f067aa0ba902b5".to_owned(), + name: "prisma:engine:db_query".to_ascii_lowercase(), + start_time: [101, 0], + end_time: [101, 10000000], + attributes: vec![( + "db.statement".to_owned(), + serde_json::Value::String("SELECT 1".to_owned()), + )] + .into_iter() + .collect(), + events: Default::default(), + links: Default::default(), + }; + + // capturing query events + let only_query_log_events: Settings = "query".into(); + let original_span_name = Some(Cow::Borrowed("quaint:query")); + + let candidate = Candidate { + value: trace_span.clone(), + settings: &only_query_log_events, + original_span_name: original_span_name.clone(), + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::LogEvent(event) => { + assert_eq!(event.level, "query"); + assert_eq!(event.name.to_string().as_str(), "SELECT 1"); + assert_eq!(event.attributes.get("duration_ms").unwrap().to_string(), "10.0"); + } + _ => unreachable!(), + }; + + // capturing query and tracing events + let query_logs_and_traces: Settings = "query, tracing".into(); + let candidate = Candidate { + value: trace_span.clone(), + settings: &query_logs_and_traces, + original_span_name: original_span_name.clone(), + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::Multiple(captures) => { + match captures[0] { + Capture::LogEvent(_) => {} + _ => unreachable!(), + }; + + match captures[1] { + Capture::Span(_) => {} + _ => unreachable!(), + }; + } + _ => unreachable!(), + }; + + // capturing nothing + let reject_all: Settings = "".into(); + let candidate = Candidate { + value: trace_span.clone(), + settings: &reject_all, + original_span_name: original_span_name.clone(), + }; + + let capture: Capture = candidate.into(); + match capture { + Capture::Discarded => {} + _ => unreachable!(), + }; + } + + #[tokio::test] + async fn test_garbage_collection() { + let exporter = Processor::new(); + + let trace_id = TraceId::from_hex("1").unwrap(); + let one_ms = Duration::from_millis(1); + + let mut settings = Settings::default(); + settings.ttl = one_ms; + + exporter.start_capturing(trace_id, settings).await; + let storage = exporter.storage.lock().unwrap(); + assert!(storage.get(&trace_id).is_some()); + drop(storage); + + tokio::time::sleep(10 * one_ms).await; + + let storage = exporter.storage.lock().unwrap(); + assert!(storage.get(&trace_id).is_none()); + } +} diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs new file mode 100644 index 000000000000..438e3eb5d3e5 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -0,0 +1,160 @@ +//! Telemetry Capturing is the process of recording the logs and traces happening during a request +//! to the binary engine, and rendering them in the response. +//! +//! The interaction diagram below (soorry width!) shows the different roles at play during telemetry +//! capturing. A textual explanatation follows it. For the sake of example a server environment +//! --the query-engine crate-- is assumed. +//! # ╔═══════════════════════╗ ╔═══════════════╗ +//! # ║<>║ ║ Storage ║ ╔═══════════════════╗ +//! # ┌───────────────────┐ ║ PROCESSOR ║ ║ ║ ║ TRACER ║ +//! # │ Server │ ╚═══════════╦═══════════╝ ╚═══════╦═══════╝ ╚═════════╦═════════╝ +//! # └─────────┬─────────┘ │ │ │ +//! # │ │ │ │ +//! # │ │ │ │ +//! # POST │ │ │ │ +//! # (body, headers)│ │ │ │ +//! # ──────────▶┌┴┐ │ │ │ +//! # ┌─┐ │ │new(headers)╔════════════╗ │ │ │ +//! # │1│ │ ├───────────▶║s: Settings ║ │ │ │ +//! # └─┘ │ │ ╚════════════╝ │ │ │ +//! # │ │ │ │ │ +//! # │ │ ╔═══════════════════╗ │ │ │ +//! # │ │ ║ Capturer::Enabled ║ │ │ │ ┌────────────┐ +//! # │ │ ╚═══════════════════╝ │ │ │ │<│ +//! # │ │ │ │ │ │ └──────┬─────┘ +//! # │ │ ┌─┐ new(trace_id, s) │ │ │ │ │ +//! # │ ├───┤2├───────────────────────▶│ │ │ │ │ +//! # │ │ └─┘ │ │ │ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ ┌─┐ start_capturing() │ start_capturing │ │ │ │ +//! # │ ├───┤3├───────────────────────▶│ (trace_id, s) │ │ │ │ +//! # │ │ └─┘ │ │ │ │ │ +//! # │ │ ├─────────────────────▶│ │ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ │ │ ┌─┐insert(trace_id, s) │ │ │ +//! # │ │ │ ├─┤4├────────────────────▶│ │ │ +//! # │ │ │ │ └─┘ │ │ │ +//! # │ │ │ │ │ ┌─┐ │ process_query │ +//! # │ ├──────────────────────────────┼──────────────────────┼─────────────────────────┼────────────┤5├──────┼──────────────────────────▶┌┴┐ +//! # │ │ │ │ │ └─┘ │ │ │ +//! # │ │ │ │ │ │ │ │ +//! # │ │ │ │ │ │ │ │ ┌─────────────────────┐ +//! # │ │ │ │ │ │ log! / span! ┌─┐ │ │ │ res: PrismaResponse │ +//! # │ │ │ │ │ │◀─────────────────────┤6├──│ │ └──────────┬──────────┘ +//! # │ │ │ │ on_end(span_data)│ ┌─┐ │ └─┘ │ │ new │ +//! # │ │ │ │◀────────────────────────┼────────────┤7├──────┤ │ │────────────▶│ +//! # │ │ │ │ │ └─┘ │ │ │ │ +//! # │ │ │ │ append(trace_id, logs, │ │ │ │ │ +//! # │ │ │ │ ┌─┐ traces) │ │ │ │ │ +//! # │ │ │ ├──┤8├───────────────────▶│ │ │ │ │ +//! # │ │ │ │ └─┘ │ │ │ │ │ +//! # │ │ res: PrismaResponse │ ┌─┐ │ │ │ │ │ │ +//! # │ │◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ┤9├ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─└┬┘ │ +//! # │ │ ┌────┐ fetch_captures() │ └─┘ │ │ │ │ │ +//! # │ ├─┤ 10 ├──────────────────────▶│ fetch_captures │ │ │ │ │ +//! # │ │ └────┘ │ (trace_id) │ │ │ │ │ +//! # │ │ ├─────────────────────▶│ │ │ x │ +//! # │ │ │ │ │ │ │ +//! # │ │ │ │┌────┐ get logs/traces │ │ │ +//! # │ │ │ ├┤ 11 ├───────────────────▶ │ │ +//! # │ │ │ │└────┘ │ │ │ +//! # │ │ │ │◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ ◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ │ +//! # │ │ logs, traces │ │ │ │ │ +//! # │ │◁─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ │ │ +//! # │ │ x ┌────┐ │ │ │ res.set_extension(logs) │ +//! # │ ├───────────────────────────────────────┤ 12 ├────────┼─────────────────────────┼─────────────────────┼──────────────────────────────────────────▶│ +//! # │ │ └────┘ │ │ │ res.set_extension(traces) │ +//! # │ ├─────────────────────────────────────────────────────┼─────────────────────────┼─────────────────────┼──────────────────────────────────────────▶│ +//! # ◀ ─ ─ ─└┬┘ │ │ │ x +//! # json!(res) │ +//! # ┌────┐ │ +//! # │ 13 │ │ +//! # └────┘ +//! # +//! # ◁─ ─ ─ ─ return +//! # +//! # ◁─────── call (pseudo-signatures) +//! # +//! +//! In the diagram, you will see objects whose lifetime is static. The boxes for those have a double +//! width margin. These are: +//! +//! - The `server` itself +//! - The global `TRACER`, which handles `log!` and `span!` and uses the global `PROCESSOR` to +//! process the data constituting a trace `Span`s and log `Event`s +//! - The global `PROCESSOR`, which manages the `Storage` set of data structures, holding logs, +//! traces (and capture settings) per request. +//! +//! Then, through the request lifecycle, different objects are created and dropped: +//! +//! - When a request comes in, its headers are processed and a [`Settings`] object is built, this +//! object determines, for the request, how logging and tracing are going to be captured: if only +//! traces, logs, or both, and which log levels are going to be captured. +//! - Based on the settings, a new `Capturer` is created; a capturer is nothing but an exporter +//! wrapped to start capturing / fetch the captures for this particular request. +//! +//! Then the capturing process works in this way: +//! +//! - The server receives a query **[1]** +//! - It grabs the HTTP headers and builds a `Capture` object **[2]**, which is configured with the settings +//! denoted by the `X-capture-telemetry` +//! - Now the server tells the `Capturer` to start capturing all the logs and traces occurring on +//! the request **[3]** (denoted by a `trace_id`) The `trace_id` is either carried on the `traceparent` +//! header or implicitly created on the first span of the request. To _start capturing_ implies +//! creating for the `trace_id` in two different data structures: `logs` and `traces`; and storing +//! the settings for selecting the Spans and Event to capture **[4]**. +//! - The server dispatches the request and _Somewhere_ else in the code, it is processed **[5]**. +//! - There the code logs events and emits traces asynchronously, as part of the processing **[6]** +//! - Traces and Logs arrive at the `TRACER`, and get hydrated in the `PROCESSOR` **[7]**, +//! which writes them in the shard corresponding to the current `trace_id`, into the +//! `logs` and `traces` storage **[8]**. The settings previously stored under the `trace_id` +//! key, are used to pick which events and spans are going to be captured based on their level. +//! - When the code that dispatches the request is done it returns a `PrismaResponse` to the +//! server **[9]**. +//! - Then the server asks the `PROCESSOR` to fetch the captures **[10]** +//! - And right after that, it fetches the captures from the `Storage` **[11]**. At that time, although +//! that's not represented in the diagram, the captures are deleted from the storage, thus +//! freeing any memory used for capturing during the request +//! - Finally, the server sets the `logs` and `traces` extensions in the `PrismaResponse`**[12]**, +//! it serializes the extended response in json format and returns it as an HTTP Response +//! blob **[13]**. +//! +pub use self::capturer::Capturer; +pub use self::settings::Settings; + +use self::capturer::Processor; +use once_cell::sync::Lazy; +use opentelemetry::{global, sdk, trace}; + +static PROCESSOR: Lazy = Lazy::new(Processor::default); +static TRACER: Lazy = Lazy::new(setup_and_install_tracer_globally); + +/// Creates a new capturer, which is configured to export traces and log events happening during a +/// particular request +pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer { + Capturer::new(PROCESSOR.to_owned(), trace_id, settings) +} + +/// Returns a clone to the global tracer used when capturing telemetry in the response +pub fn tracer() -> &'static sdk::trace::Tracer { + &TRACER +} + +/// Installs an opentelemetry tracer globally, which is configured to proecss +/// spans and export them to global exporter. +fn setup_and_install_tracer_globally() -> sdk::trace::Tracer { + global::set_text_map_propagator(sdk::propagation::TraceContextPropagator::new()); + + let provider_builder = sdk::trace::TracerProvider::builder().with_span_processor(PROCESSOR.to_owned()); + let provider = provider_builder.build(); + let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "opentelemetry"); + + global::set_tracer_provider(provider); + tracer +} + +mod capturer; +mod settings; +pub mod storage; diff --git a/query-engine/core/src/telemetry/capturing/settings.rs b/query-engine/core/src/telemetry/capturing/settings.rs new file mode 100644 index 000000000000..a039212b96d1 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/settings.rs @@ -0,0 +1,88 @@ +use std::{collections::HashSet, time}; + +static VALID_VALUES: &[&str] = &["error", "warn", "info", "debug", "trace", "query", "tracing"]; +static DEFAULT_TTL: time::Duration = time::Duration::from_secs(1800); // 30 minutes + +#[derive(Debug, Clone, Default)] +pub struct Settings { + // only capture log events from the specified log levels, the special level "query", which does not + // exist in the engine logging infrastructure, is shimed from any event describing a query, regardless + // of its level. + pub(super) included_log_levels: HashSet, + // whether to include trace spans when capturing + pub(super) include_traces: bool, + // how long to keep captured traces in memory, if by any chance the capturing breaks, a tokio task + // will clean captured traces after this duration. Defaults to [`DEFAULT_TTL`]. + pub(super) ttl: time::Duration, +} + +impl Settings { + pub(super) fn new(included_log_levels: HashSet, include_traces: bool, ttl: time::Duration) -> Self { + Self { + include_traces, + included_log_levels, + ttl, + } + } +} + +// As the test below shows, settings can be constructed froma comma separated string. +// Examples: valid: `"error, warn, query, tracing"` invalid: "foo, bar baz". strings corresponding +// passed in are trimmed and converted to lowercase. chunks corresponding to levels different from +// those in VALID_LEVELS are ignored. +// +// The ttl is always the same (DEFAULT_TTL) but is there to allow for easier unit-testing of c +// apturing logic +impl From<&str> for Settings { + fn from(s: &str) -> Self { + let chunks = s.split(','); + let mut set = HashSet::from_iter( + chunks + .into_iter() + .map(str::trim) + .map(str::to_lowercase) + .filter(|s| VALID_VALUES.contains(&s.as_str())), + ); + + let include_traces = set.remove("tracing"); + let included_log_levels = set; + + Self::new(included_log_levels, include_traces, DEFAULT_TTL) + } +} + +impl Settings { + #[inline(always)] + pub fn is_enabled(&self) -> bool { + self.traces_enabled() || self.logs_enabled() + } + + #[inline(always)] + pub fn traces_enabled(&self) -> bool { + self.include_traces + } + + #[inline(always)] + pub fn logs_enabled(&self) -> bool { + !self.included_log_levels.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_options_from() { + let options = Settings::from("error, warn, query, tracing"); + assert_eq!(options.included_log_levels.len(), 3); + assert!(options.included_log_levels.contains("error")); + assert!(options.included_log_levels.contains("warn")); + assert!(options.included_log_levels.contains("query")); + assert!(options.include_traces); + assert!(options.is_enabled()); + + let options = Settings::from("foo, bar baz"); + assert!(!options.is_enabled()); + } +} diff --git a/query-engine/core/src/telemetry/capturing/storage.rs b/query-engine/core/src/telemetry/capturing/storage.rs new file mode 100644 index 000000000000..9c2767169112 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/storage.rs @@ -0,0 +1,19 @@ +use super::settings::Settings; +use crate::telemetry::models; + +#[derive(Debug, Default)] +pub struct Storage { + pub traces: Vec, + pub logs: Vec, + pub settings: Settings, +} + +impl From for Storage { + fn from(settings: Settings) -> Self { + Self { + traces: Default::default(), + logs: Default::default(), + settings, + } + } +} diff --git a/query-engine/core/src/telemetry/helpers.rs b/query-engine/core/src/telemetry/helpers.rs index b62d0b72235f..b65484234e98 100644 --- a/query-engine/core/src/telemetry/helpers.rs +++ b/query-engine/core/src/telemetry/helpers.rs @@ -41,6 +41,15 @@ pub fn set_span_link_from_traceparent(span: &Span, traceparent: Option) } } +pub fn get_trace_id_from_traceparent(traceparent: Option<&str>) -> TraceId { + traceparent + .unwrap_or("0-0-0-0") + .split('-') + .nth(1) + .map(|id| TraceId::from_hex(id).unwrap_or(TraceId::INVALID)) + .unwrap() +} + pub fn get_trace_id_from_context(context: &Context) -> TraceId { let context_span = context.span(); context_span.span_context().trace_id() diff --git a/query-engine/core/src/telemetry/mod.rs b/query-engine/core/src/telemetry/mod.rs index 29dd6738caf2..56aab06e731f 100644 --- a/query-engine/core/src/telemetry/mod.rs +++ b/query-engine/core/src/telemetry/mod.rs @@ -1,2 +1,3 @@ +pub mod capturing; pub mod helpers; pub mod models; diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 728b7c27a1a6..98176683a501 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -27,6 +27,9 @@ pub struct Logger { enum TracingConfig { // exposed means tracing will be exposed through an HTTP endpoint in a jaeger-compatible format Http(String), + // captured means that traces will be captured in memory and exposed in the graphql response + // logs will be also exposed in the response when capturing is enabled + Captured, // stdout means that traces will be printed to standard output Stdout, // disabled means that tracing will be disabled @@ -59,16 +62,17 @@ impl Logger { self.metrics = Some(metrics); } - pub fn setup_telemetry(&mut self, enable_telemetry: bool, endpoint: &str) { + pub fn setup_telemetry(&mut self, enable_telemetry: bool, enable_capturing: bool, endpoint: &str) { let endpoint = if endpoint.is_empty() { None } else { Some(endpoint.to_owned()) }; - self.tracing_config = match (enable_telemetry, endpoint) { - (true, Some(endpoint)) => TracingConfig::Http(endpoint), - (true, None) => TracingConfig::Stdout, + self.tracing_config = match (enable_telemetry, enable_capturing, endpoint) { + (_, true, _) => TracingConfig::Captured, + (true, _, Some(endpoint)) => TracingConfig::Http(endpoint), + (true, _, None) => TracingConfig::Stdout, _ => TracingConfig::Disabled, }; } @@ -83,6 +87,7 @@ impl Logger { pub fn install(&self) -> LoggerResult<()> { let filter = telemetry::helpers::env_filter(self.log_queries, telemetry::helpers::QueryEngineLogLevel::FromEnv); let is_user_trace = filter_fn(telemetry::helpers::user_facing_span_only_filter); + let is_user_trace_or_event = filter_fn(telemetry::helpers::user_facing_filter); let fmt_layer = match self.log_format { LogFormat::Text => { @@ -100,6 +105,19 @@ impl Logger { .with(self.metrics.clone()); match self.tracing_config { + TracingConfig::Captured => { + // Capturing is enabled, it overrides otel exporting. + let tracer = telemetry::capturing::tracer().to_owned(); + let telemetry_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(is_user_trace_or_event) + .with_filter(telemetry::helpers::env_filter( + self.log_queries, + telemetry::helpers::QueryEngineLogLevel::FromEnv, + )); + let subscriber = subscriber.with(telemetry_layer); + subscriber::set_global_default(subscriber)?; + } TracingConfig::Http(ref endpoint) => { // Opentelemetry is enabled, but capturing is disabled, there's an endpoint to export // the traces to. diff --git a/query-engine/query-engine/src/opt.rs b/query-engine/query-engine/src/opt.rs index a42a8e31b31f..0914fb5101e5 100644 --- a/query-engine/query-engine/src/opt.rs +++ b/query-engine/query-engine/src/opt.rs @@ -103,6 +103,10 @@ pub struct PrismaOpt { #[structopt(long)] pub enable_open_telemetry: bool, + #[structopt(long)] + /// Enable tracer to capture logs and traces and return in the response + pub enable_telemetry_in_response: bool, + /// The url to the OpenTelemetry collector. /// Enabling this will send the OpenTelemtry tracing to a collector /// and not via our custom stdout tracer diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index 02d161832c72..72cc7b894f10 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -2,9 +2,12 @@ use crate::state::State; use crate::{opt::PrismaOpt, PrismaResult}; use hyper::service::{make_service_fn, service_fn}; use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode}; -use opentelemetry::global; -use opentelemetry::propagation::Extractor; -use query_core::{schema::QuerySchemaRenderer, TransactionOptions, TxId}; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::{global, propagation::Extractor}; +use query_core::helpers::get_trace_id_from_traceparent; +use query_core::{ + schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId, +}; use query_engine_metrics::MetricFormat; use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler}; use serde_json::json; @@ -113,7 +116,8 @@ async fn graphql_handler(state: State, req: Request) -> Result) -> Result) -> Result { let handler = GraphQlHandler::new(&*state.cx.executor, state.cx.query_schema()); - let result = handler.handle(body, tx_id, traceparent).instrument(span).await; + let mut result = handler.handle(body, tx_id, traceparent).instrument(span).await; + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + let telemetry = capturer.fetch_captures().await; + if let Some(telemetry) = telemetry { + result.set_extension("traces".to_owned(), json!(telemetry.traces)); + result.set_extension("logs".to_owned(), json!(telemetry.logs)); + } + } let result_bytes = serde_json::to_vec(&result).unwrap(); @@ -231,11 +277,11 @@ async fn transaction_handler(state: State, req: Request) -> Result) -> Result) -> Result, hyper::Error> { - let cx = get_parent_span_context(req.headers()); + let headers = req.headers().to_owned(); + let body_start = req.into_body(); - // block and buffer request until the request has completed let full_body = hyper::body::to_bytes(body_start).await?; - let tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap(); + let mut tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap(); + let tx_id = tx_opts.with_new_transaction_id(); + // This is the span we use to instrument the execution of a transaction. This span will be open + // during the tx execution, and held in the ITXServer for that transaction (see ITXServer]) let span = info_span!("prisma:engine:itx_runner", user_facing = true, itx_id = field::Empty); - span.set_parent(cx); + + // If telemetry needs to be captured, we use the span trace_id to correlate the logs happening + // during the different operations within a transaction. The trace_id is propagated in the + // traceparent header, but if it's not present, we need to synthetically create one for the + // transaction. This is needed, in case the client is interested in capturing logs and not + // traces, because: + // - The client won't send a traceparent header + // - A transaction initial span is created here (prisma:engine:itx_runner) and stored in the + // ITXServer for that transaction + // - When a query comes in, the graphql handler process it, but we need to tell the capturer + // to start capturing logs, and for that we need a trace_id. There are two places were we + // could get that information from: + // - First, it's the traceparent, but the client didn't send it, because they are only + // interested in logs. + // - Second, it's the root span for the transaction, but it's not in scope but instead + // stored in the ITXServer, in a different tokio task. + // + // For the above reasons, we need to create a trace_id that we can predict and use accross the + // different operations happening within a transaction. So we do it by converting the tx_id + // into a trace_id, leaning on the fact that the tx_id has more entropy, and there's no + // information loss. + let capture_settings = capture_settings(&headers); + let traceparent = traceparent(&headers); + if traceparent.is_none() && capture_settings.logs_enabled() { + span.set_parent(tx_id.into()) + } else { + span.set_parent(get_parent_span_context(&headers)) + } + let trace_id = span.context().span().span_context().trace_id(); + let capture_config = telemetry::capturing::capturer(trace_id, capture_settings); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } let result = state .cx @@ -262,9 +344,19 @@ async fn transaction_start_handler(state: State, req: Request) -> Result { - let result = json!({ "id": tx_id.to_string() }); + let result = if let Some(telemetry) = telemetry { + json!({ "id": tx_id.to_string(), "extensions": { "logs": json!(telemetry.logs), "traces": json!(telemetry.traces) } }) + } else { + json!({ "id": tx_id.to_string() }) + }; let result_bytes = serde_json::to_vec(&result).unwrap(); let res = Response::builder() @@ -274,23 +366,57 @@ async fn transaction_start_handler(state: State, req: Request) -> Result Ok(err_to_http_resp(err)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } -async fn transaction_commit_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { +async fn transaction_commit_handler( + state: State, + req: Request, + tx_id: TxId, +) -> Result, hyper::Error> { + let capture_config = capture_config(req.headers(), tx_id.clone()); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } + let result = state.cx.executor.commit_tx(tx_id).await; + + let telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.fetch_captures().await + } else { + None + }; + match result { - Ok(_) => Ok(empty_json_to_http_resp()), - Err(err) => Ok(err_to_http_resp(err)), + Ok(_) => Ok(empty_json_to_http_resp(telemetry)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } -async fn transaction_rollback_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { +async fn transaction_rollback_handler( + state: State, + req: Request, + tx_id: TxId, +) -> Result, hyper::Error> { + let capture_config = capture_config(req.headers(), tx_id.clone()); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } + let result = state.cx.executor.rollback_tx(tx_id).await; + + let telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.fetch_captures().await + } else { + None + }; + match result { - Ok(_) => Ok(empty_json_to_http_resp()), - Err(err) => Ok(err_to_http_resp(err)), + Ok(_) => Ok(empty_json_to_http_resp(telemetry)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } @@ -329,8 +455,12 @@ impl<'a> Extractor for HeaderExtractor<'a> { } } -fn empty_json_to_http_resp() -> Response { - let result = json!({}); +fn empty_json_to_http_resp(captured_telemetry: Option) -> Response { + let result = if let Some(telemetry) = captured_telemetry { + json!({ "extensions": { "logs": json!(telemetry.logs), "traces": json!(telemetry.traces) } }) + } else { + json!({}) + }; let result_bytes = serde_json::to_vec(&result).unwrap(); Response::builder() @@ -340,7 +470,10 @@ fn empty_json_to_http_resp() -> Response { .unwrap() } -fn err_to_http_resp(err: query_core::CoreError) -> Response { +fn err_to_http_resp( + err: query_core::CoreError, + captured_telemetry: Option, +) -> Response { let status = match err { query_core::CoreError::TransactionError(ref err) => match err { query_core::TransactionError::AcquisitionTimeout => 504, @@ -354,11 +487,40 @@ fn err_to_http_resp(err: query_core::CoreError) -> Response { _ => 500, }; - let user_error: user_facing_errors::Error = err.into(); - let body = Body::from(serde_json::to_vec(&user_error).unwrap()); + let mut err: ExtendedTransactionUserFacingError = err.into(); + if let Some(telemetry) = captured_telemetry { + err.set_extension("traces".to_owned(), json!(telemetry.traces)); + err.set_extension("logs".to_owned(), json!(telemetry.logs)); + } + let body = Body::from(serde_json::to_vec(&err).unwrap()); Response::builder().status(status).body(body).unwrap() } +fn capture_config(headers: &HeaderMap, tx_id: TxId) -> telemetry::capturing::Capturer { + let capture_settings = capture_settings(headers); + let mut traceparent = traceparent(headers); + + if traceparent.is_none() && capture_settings.is_enabled() { + traceparent = Some(tx_id.as_traceparent()) + } + + let trace_id = get_trace_id_from_traceparent(traceparent.as_deref()); + + telemetry::capturing::capturer(trace_id, capture_settings) +} + +#[allow(clippy::bind_instead_of_map)] +fn capture_settings(headers: &HeaderMap) -> telemetry::capturing::Settings { + const CAPTURE_TELEMETRY_HEADER: &str = "X-capture-telemetry"; + let s = if let Some(hv) = headers.get(CAPTURE_TELEMETRY_HEADER) { + hv.to_str().unwrap_or("") + } else { + "" + }; + + telemetry::capturing::Settings::from(s) +} + fn traceparent(headers: &HeaderMap) -> Option { const TRACEPARENT_HEADER: &str = "traceparent"; diff --git a/query-engine/query-engine/src/state.rs b/query-engine/query-engine/src/state.rs index e380e3f5d05c..892edbd9f548 100644 --- a/query-engine/query-engine/src/state.rs +++ b/query-engine/query-engine/src/state.rs @@ -41,7 +41,11 @@ pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option PrismaResult<()> { subcommand: Some(Subcommand::Cli(CliOpt::Dmmf)), enable_open_telemetry: false, open_telemetry_endpoint: String::new(), + enable_telemetry_in_response: false, dataproxy_metric_override: false, }; diff --git a/query-engine/request-handlers/src/graphql/response.rs b/query-engine/request-handlers/src/graphql/response.rs index 859d6261fecc..4f80031ab790 100644 --- a/query-engine/request-handlers/src/graphql/response.rs +++ b/query-engine/request-handlers/src/graphql/response.rs @@ -12,6 +12,9 @@ pub struct GQLResponse { #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, + + #[serde(skip_serializing_if = "IndexMap::is_empty")] + extensions: Map, } #[derive(Debug, serde::Serialize, Default, PartialEq)] @@ -22,6 +25,9 @@ pub struct GQLBatchResponse { #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, + + #[serde(skip_serializing_if = "IndexMap::is_empty")] + extensions: Map, } #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq)] @@ -67,6 +73,10 @@ impl GQLResponse { pub fn errors(&self) -> impl Iterator { self.errors.iter() } + + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } } impl From for GQLResponse { @@ -153,6 +163,10 @@ impl GQLBatchResponse { .iter() .chain(self.batch_result.iter().flat_map(|res| res.errors())) } + + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } } impl From for GQLBatchResponse { diff --git a/query-engine/request-handlers/src/lib.rs b/query-engine/request-handlers/src/lib.rs index 79f9bcab50d9..10fb55af4ee1 100644 --- a/query-engine/request-handlers/src/lib.rs +++ b/query-engine/request-handlers/src/lib.rs @@ -19,3 +19,12 @@ pub enum PrismaResponse { Single(GQLResponse), Multi(GQLBatchResponse), } + +impl PrismaResponse { + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + match self { + Self::Single(r) => r.set_extension(key, val), + Self::Multi(r) => r.set_extension(key, val), + } + } +}