diff --git a/Cargo.lock b/Cargo.lock index d1de3b9ee128..f9adc7abf9f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1536,7 +1536,7 @@ dependencies = [ "migration-connector", "pretty_assertions", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde_json", "sql-introspection-connector", "sql-migration-connector", @@ -2015,7 +2015,7 @@ dependencies = [ "json-rpc-stdio", "migration-connector", "migration-core", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde_json", "structopt", "tempfile", @@ -2046,7 +2046,7 @@ dependencies = [ "pretty_assertions", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde", "serde_json", "sql-migration-connector", @@ -2584,6 +2584,7 @@ dependencies = [ "percent-encoding", "pin-project", "rand 0.8.5", + "serde", "thiserror", "tokio", "tokio-stream", @@ -3150,12 +3151,52 @@ dependencies = [ "mongodb-client", "parking_lot 0.12.1", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "tempfile", "test-setup", "url", ] +[[package]] +name = "quaint" +version = "0.2.0-alpha.13" +source = "git+https://github.com/miguelff/quaint?branch=fix-traces#76a56f5940bd44d9f51cd73b89b274c65ff2a081" +dependencies = [ + "async-trait", + "base64 0.12.3", + "bigdecimal", + "bit-vec", + "byteorder", + "bytes", + "chrono", + "connection-string", + "either", + "futures", + "hex", + "libsqlite3-sys", + "lru-cache", + "metrics 0.18.1", + "mobc", + "mysql_async", + "native-tls", + "num_cpus", + "percent-encoding", + "postgres-native-tls", + "postgres-types", + "rusqlite", + "serde_json", + "sqlformat", + "thiserror", + "tiberius", + "tokio", + "tokio-postgres", + "tokio-util 0.6.10", + "tracing", + "tracing-core", + "url", + "uuid 1.1.2", +] + [[package]] name = "quaint" version = "0.2.0-alpha.13" @@ -3291,7 +3332,7 @@ dependencies = [ "opentelemetry-otlp", "prisma-models", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "query-connector", "query-core", "query-engine-metrics", @@ -3415,7 +3456,7 @@ dependencies = [ "prisma-models", "psl", "qe-setup", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/prisma/quaint)", "query-core", "query-engine", "query-engine-metrics", @@ -4095,7 +4136,7 @@ dependencies = [ "pretty_assertions", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "regex", "serde", "serde_json", @@ -4118,7 +4159,7 @@ dependencies = [ "migration-connector", "once_cell", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "regex", "serde_json", "sql-ddl", @@ -4148,7 +4189,7 @@ dependencies = [ "prisma-models", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "query-connector", "rand 0.7.3", "serde", @@ -4177,7 +4218,7 @@ dependencies = [ "pretty_assertions", "prisma-value", "psl", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "regex", "serde", "test-macros", @@ -4361,7 +4402,7 @@ dependencies = [ "dissimilar", "enumflags2", "once_cell", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "tokio", "tracing", "tracing-error", @@ -4997,7 +5038,7 @@ version = "0.1.0" dependencies = [ "backtrace", "indoc", - "quaint", + "quaint 0.2.0-alpha.13 (git+https://github.com/miguelff/quaint?branch=fix-traces)", "serde", "serde_json", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 76806c557ad7..5be08ed883e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,8 @@ user-facing-errors = { path = "./libs/user-facing-errors" } uuid = { version = "1", features = ["serde"] } [workspace.dependencies.quaint] -git = "https://github.com/prisma/quaint" +git = "https://github.com/miguelff/quaint" +branch = "fix-traces" features = [ "bigdecimal", "chrono", diff --git a/Makefile b/Makefile index c0f170ca940a..2c0ea71694b4 100644 --- a/Makefile +++ b/Makefile @@ -214,7 +214,7 @@ validate: cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma qe: - cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry + cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-telemetry-in-response qe-dmmf: cargo run --bin query-engine -- cli dmmf > dmmf.json diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs index c11f08e5a0b9..1facd8f8527f 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs @@ -20,7 +20,13 @@ pub struct BinaryRunner { #[async_trait::async_trait] impl RunnerInterface for BinaryRunner { async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult { - let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]); + let opts = PrismaOpt::from_list(&[ + "binary", + "--enable-raw-queries", + "--enable-telemetry-in-response", + "--datamodel", + &datamodel, + ]); let state = setup(&opts, false, Some(metrics)).await.unwrap(); let configuration = opts.configuration(true).unwrap(); diff --git a/query-engine/core/Cargo.toml b/query-engine/core/Cargo.toml index 51fccf4103b3..3bd082060f3c 100644 --- a/query-engine/core/Cargo.toml +++ b/query-engine/core/Cargo.toml @@ -22,11 +22,11 @@ futures = "0.3" indexmap = { version = "1.7", features = ["serde-1"] } itertools = "0.10" mongodb-connector = { path = "../connectors/mongodb-query-connector", package = "mongodb-query-connector", optional = true } -once_cell = "1.3" +once_cell = "1" petgraph = "0.4" prisma-models = { path = "../prisma-models", features = ["default_generators"] } prisma-value = { path = "../../libs/prisma-value" } -opentelemetry = { version = "0.17"} +opentelemetry = { version = "0.17.0", features = ["rt-tokio", "serialize"] } query-engine-metrics = {path = "../metrics"} serde.workspace = true serde_json.workspace = true @@ -35,7 +35,7 @@ thiserror = "1.0" tokio.workspace = true tracing = { version = "0.1", features = ["attributes"] } tracing-futures = "0.2" -tracing-subscriber = {version = "0.3", features = ["env-filter"]} +tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-opentelemetry = "0.17.4" url = "2" user-facing-errors = { path = "../../libs/user-facing-errors" } diff --git a/query-engine/core/src/interactive_transactions/error.rs b/query-engine/core/src/interactive_transactions/error.rs index 146d69f103b5..8189e2ce7420 100644 --- a/query-engine/core/src/interactive_transactions/error.rs +++ b/query-engine/core/src/interactive_transactions/error.rs @@ -1,5 +1,10 @@ use thiserror::Error; +use crate::{ + response_ir::{Item, Map}, + CoreError, +}; + #[derive(Debug, Error, PartialEq)] pub enum TransactionError { #[error("Unable to start a transaction in the given time.")] @@ -17,3 +22,27 @@ pub enum TransactionError { #[error("Unexpected response: {reason}.")] Unknown { reason: String }, } + +#[derive(Debug, serde::Serialize, PartialEq)] +pub struct ExtendedTransactionUserFacingError { + #[serde(flatten)] + user_facing_error: user_facing_errors::Error, + + #[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")] + extensions: Map, +} + +impl ExtendedTransactionUserFacingError { + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } +} + +impl From for ExtendedTransactionUserFacingError { + fn from(error: CoreError) -> Self { + ExtendedTransactionUserFacingError { + user_facing_error: error.into(), + extensions: Default::default(), + } + } +} diff --git a/query-engine/core/src/lib.rs b/query-engine/core/src/lib.rs index 3bf22ee2cfde..1645fa13e16f 100644 --- a/query-engine/core/src/lib.rs +++ b/query-engine/core/src/lib.rs @@ -16,7 +16,7 @@ pub mod telemetry; pub use self::{ error::{CoreError, FieldConversionError}, executor::{QueryExecutor, TransactionOptions}, - interactive_transactions::{TransactionError, TxId}, + interactive_transactions::{ExtendedTransactionUserFacingError, TransactionError, TxId}, query_document::*, telemetry::*, }; diff --git a/query-engine/core/src/telemetry/capturing/capturer.rs b/query-engine/core/src/telemetry/capturing/capturer.rs new file mode 100644 index 000000000000..1b36626cd1fd --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/capturer.rs @@ -0,0 +1,225 @@ +use super::{settings::Settings, storage::Storage}; +use crate::models; +use opentelemetry::{ + sdk::{ + export::trace::SpanData, + trace::{Span, SpanProcessor}, + }, + trace::{TraceId, TraceResult}, +}; +use std::{collections::HashMap, sync::Arc, sync::Mutex}; + +/// Capturer determines, based on a set of settings and a trace id, how capturing is going to be handled. +/// Generally, both the trace id and the settings will be derived from request headers. Thus, a new +/// value of this enum is created per request. +#[derive(Debug, Clone)] +pub enum Capturer { + Enabled(Inner), + Disabled, +} + +impl Capturer { + pub(super) fn new(processor: Processor, trace_id: TraceId, settings: Settings) -> Self { + if settings.is_enabled() { + return Self::Enabled(Inner { + processor, + trace_id, + settings, + }); + } + + Self::Disabled + } +} + +#[derive(Debug, Clone)] +pub struct Inner { + pub(super) processor: Processor, + pub(super) trace_id: TraceId, + pub(super) settings: Settings, +} + +impl Inner { + pub async fn start_capturing(&self) { + self.processor + .start_capturing(self.trace_id, self.settings.clone()) + .await + } + + pub async fn fetch_captures(&self) -> Option { + self.processor.fetch_captures(self.trace_id).await + } +} + +/// A [`SpanProcessor`] that captures and stores spans in memory in a synchronized dictionary for +/// later retrieval +#[derive(Debug, Clone)] +pub struct Processor { + pub(crate) storage: Arc>>, +} + +impl Processor { + pub fn new() -> Self { + Self { + storage: Default::default(), + } + } + + async fn start_capturing(&self, trace_id: TraceId, settings: Settings) { + let mut locked_storage = self.storage.lock().unwrap(); + locked_storage.insert(trace_id, settings.clone().into()); + drop(locked_storage); + + let ttl = settings.ttl; + let storage = self.storage.clone(); + tokio::spawn(async move { + tokio::time::sleep(ttl).await; + let mut locked_traces = storage.lock().unwrap(); + if locked_traces.remove(&trace_id).is_some() { + warn!("Timeout waiting for telemetry to be captured. trace_id={}", trace_id) + } + }); + } + + async fn fetch_captures(&self, trace_id: TraceId) -> Option { + let mut traces = self.storage.lock().unwrap(); + + traces.remove(&trace_id) + } +} + +impl Default for Processor { + fn default() -> Self { + Self::new() + } +} + +impl SpanProcessor for Processor { + fn on_start(&self, _: &mut Span, _: &opentelemetry::Context) { + // no-op + } + + /// Exports a spancontaining zero or more events that might represent + /// logs in the prisma client logging categories of logs (query, info, warn, error) + /// + /// There's an impedance between the client categories of logs and the server (standard) + /// hierarchical levels of logs (trace, debug, info, warn, error). + /// + /// The most prominent difference is the "query" type of events. In the client these model + /// database queries made by the engine through a connector. But ATM there's not a 1:1 mapping + /// between the client "query" level and one of the server levels. And depending on the database + /// mongo / relational, the information to build this kind of log event is logged diffeerently in + /// the server. + /// + /// In the case of the of relational databaes --queried through sql_query_connector and eventually + /// through quaint, a trace span describes the query-- `TraceSpan::represents_query_event` + /// determines if a span represents a query event. + /// + /// In the case of mongo, an event represents the query, but it needs to be transformed before + /// capturing it. `Event::query_event` does that. + fn on_end(&self, span_data: SpanData) { + let trace_id = span_data.span_context.trace_id(); + + let mut locked_storage = self.storage.lock().unwrap(); + if let Some(storage) = locked_storage.get_mut(&trace_id) { + let settings = storage.settings.clone(); + + let (events, span) = models::TraceSpan::from(span_data).split_events(); + + if settings.traces_enabled() { + storage.traces.push(span); + } + + if storage.settings.logs_enabled() { + events.into_iter().for_each(|log| { + let candidate = Candidate { + value: log, + settings: &settings, + }; + if candidate.is_loggable_query_event() { + storage.logs.push(candidate.query_event()) + } else if candidate.is_loggable_event() { + storage.logs.push(candidate.value) + } + }); + } + } + } + + fn force_flush(&self) -> TraceResult<()> { + // no-op + Ok(()) + } + + fn shutdown(&mut self) -> TraceResult<()> { + // no-op + Ok(()) + } +} +const VALID_QUERY_ATTRS: [&str; 4] = ["query", "params", "target", "duration_ms"]; +/// A Candidate represents either a span or an event that is being considered for capturing. +/// A Candidate can be converted into a [`Capture`]. +#[derive(Debug, Clone)] +struct Candidate<'batch_iter> { + value: models::LogEvent, + settings: &'batch_iter Settings, +} + +impl Candidate<'_> { + #[inline(always)] + fn is_loggable_query_event(&self) -> bool { + if self.settings.included_log_levels.contains("query") { + if let Some(target) = self.value.attributes.get("target") { + if let Some(val) = target.as_str() { + return (val == "quaint::connector::metrics" && self.value.attributes.get("query").is_some()) + || val == "mongodb_query_connector::query"; + } + } + } + false + } + + fn query_event(mut self) -> models::LogEvent { + self.value + .attributes + .retain(|key, _| (&VALID_QUERY_ATTRS).contains(&key.as_str())); + + models::LogEvent { + level: "query".to_string(), + ..self.value + } + } + + #[inline(always)] + fn is_loggable_event(&self) -> bool { + self.settings.included_log_levels.contains(&self.value.level) + } +} + +/// tests for capture exporter +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn test_garbage_collection() { + let exporter = Processor::new(); + + let trace_id = TraceId::from_hex("1").unwrap(); + let one_ms = Duration::from_millis(1); + + let mut settings = Settings::default(); + settings.ttl = one_ms; + + exporter.start_capturing(trace_id, settings).await; + let storage = exporter.storage.lock().unwrap(); + assert!(storage.get(&trace_id).is_some()); + drop(storage); + + tokio::time::sleep(10 * one_ms).await; + + let storage = exporter.storage.lock().unwrap(); + assert!(storage.get(&trace_id).is_none()); + } +} diff --git a/query-engine/core/src/telemetry/capturing/helpers.rs b/query-engine/core/src/telemetry/capturing/helpers.rs new file mode 100644 index 000000000000..967a1311bf1b --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/helpers.rs @@ -0,0 +1,12 @@ +use tracing::Metadata; + +/// Filters-in spans and events that are statically determined to be relevant for capturing +/// Dynamic filtering will be done by the [`crate::capturer::Processor`] +pub fn span_and_event_filter(meta: &Metadata) -> bool { + if meta.fields().iter().any(|f| f.name() == "user_facing") { + return true; + } + + // relevant quaint connector or mongodb connector spans and events + meta.target() == "quaint::connector::metrics" || meta.target() == "mongodb_query_connector::query" +} diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs new file mode 100644 index 000000000000..82171f3156ee --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -0,0 +1,183 @@ +//! Telemetry Capturing is the process of recording the logs and traces happening during a request +//! to the binary engine, and rendering them in the response. +//! +//! The interaction diagram below (soorry width!) shows the different roles at play during telemetry +//! capturing. A textual explanatation follows it. For the sake of example a server environment +//! --the query-engine crate-- is assumed. +//! # ╔═══════════════════════╗ ╔═══════════════╗ +//! # ║<>║ ║ Storage ║ ╔═══════════════════╗ +//! # ┌───────────────────┐ ║ PROCESSOR ║ ║ ║ ║ TRACER ║ +//! # │ Server │ ╚═══════════╦═══════════╝ ╚═══════╦═══════╝ ╚═════════╦═════════╝ +//! # └─────────┬─────────┘ │ │ │ +//! # │ │ │ │ +//! # │ │ │ │ +//! # POST │ │ │ │ +//! # (body, headers)│ │ │ │ +//! # ──────────▶┌┴┐ │ │ │ +//! # ┌─┐ │ │new(headers)╔════════════╗ │ │ │ +//! # │1│ │ ├───────────▶║s: Settings ║ │ │ │ +//! # └─┘ │ │ ╚════════════╝ │ │ │ +//! # │ │ │ │ │ +//! # │ │ ╔═══════════════════╗ │ │ │ +//! # │ │ ║ Capturer::Enabled ║ │ │ │ ┌────────────┐ +//! # │ │ ╚═══════════════════╝ │ │ │ │<│ +//! # │ │ │ │ │ │ └──────┬─────┘ +//! # │ │ ┌─┐ new(trace_id, s) │ │ │ │ │ +//! # │ ├───┤2├───────────────────────▶│ │ │ │ │ +//! # │ │ └─┘ │ │ │ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ ┌─┐ start_capturing() │ start_capturing │ │ │ │ +//! # │ ├───┤3├───────────────────────▶│ (trace_id, s) │ │ │ │ +//! # │ │ └─┘ │ │ │ │ │ +//! # │ │ ├─────────────────────▶│ │ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ │ │ ┌─┐insert(trace_id, s) │ │ │ +//! # │ │ │ ├─┤4├────────────────────▶│ │ │ +//! # │ │ │ │ └─┘ │ │ │ +//! # │ │ │ │ │ ┌─┐ │ process_query │ +//! # │ ├──────────────────────────────┼──────────────────────┼─────────────────────────┼────────────┤5├──────┼──────────────────────────▶┌┴┐ +//! # │ │ │ │ │ └─┘ │ │ │ +//! # │ │ │ │ │ │ │ │ +//! # │ │ │ │ │ │ │ │ ┌─────────────────────┐ +//! # │ │ │ │ │ │ log! / span! ┌─┐ │ │ │ res: PrismaResponse │ +//! # │ │ │ │ │ │◀─────────────────────┤6├──│ │ └──────────┬──────────┘ +//! # │ │ │ │ on_end(span_data)│ ┌─┐ │ └─┘ │ │ new │ +//! # │ │ │ │◀────────────────────────┼────────────┤7├──────┤ │ │────────────▶│ +//! # │ │ │ │ │ └─┘ │ │ │ │ +//! # │ │ │ │ append(trace_id, logs, │ │ │ │ │ +//! # │ │ │ │ ┌─┐ traces) │ │ │ │ │ +//! # │ │ │ ├──┤8├───────────────────▶│ │ │ │ │ +//! # │ │ │ │ └─┘ │ │ │ │ │ +//! # │ │ res: PrismaResponse │ ┌─┐ │ │ │ │ │ │ +//! # │ │◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ┤9├ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─└┬┘ │ +//! # │ │ ┌────┐ fetch_captures() │ └─┘ │ │ │ │ │ +//! # │ ├─┤ 10 ├──────────────────────▶│ fetch_captures │ │ │ │ │ +//! # │ │ └────┘ │ (trace_id) │ │ │ │ │ +//! # │ │ ├─────────────────────▶│ │ │ x │ +//! # │ │ │ │ │ │ │ +//! # │ │ │ │┌────┐ get logs/traces │ │ │ +//! # │ │ │ ├┤ 11 ├───────────────────▶ │ │ +//! # │ │ │ │└────┘ │ │ │ +//! # │ │ │ │◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ +//! # │ │ │ │ │ │ │ +//! # │ │ ◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ │ +//! # │ │ logs, traces │ │ │ │ │ +//! # │ │◁─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ │ │ +//! # │ │ x ┌────┐ │ │ │ res.set_extension(logs) │ +//! # │ ├───────────────────────────────────────┤ 12 ├────────┼─────────────────────────┼─────────────────────┼──────────────────────────────────────────▶│ +//! # │ │ └────┘ │ │ │ res.set_extension(traces) │ +//! # │ ├─────────────────────────────────────────────────────┼─────────────────────────┼─────────────────────┼──────────────────────────────────────────▶│ +//! # ◀ ─ ─ ─└┬┘ │ │ │ x +//! # json!(res) │ +//! # ┌────┐ │ +//! # │ 13 │ │ +//! # └────┘ +//! # +//! # ◁─ ─ ─ ─ return +//! # +//! # ◁─────── call (pseudo-signatures) +//! # +//! +//! In the diagram, you will see objects whose lifetime is static. The boxes for those have a double +//! width margin. These are: +//! +//! - The `server` itself +//! - The global `TRACER`, which handles `log!` and `span!` and uses the global `PROCESSOR` to +//! process the data constituting a trace `Span`s and log `Event`s +//! - The global `PROCESSOR`, which manages the `Storage` set of data structures, holding logs, +//! traces (and capture settings) per request. +//! +//! Then, through the request lifecycle, different objects are created and dropped: +//! +//! - When a request comes in, its headers are processed and a [`Settings`] object is built, this +//! object determines, for the request, how logging and tracing are going to be captured: if only +//! traces, logs, or both, and which log levels are going to be captured. +//! - Based on the settings, a new `Capturer` is created; a capturer is nothing but an exporter +//! wrapped to start capturing / fetch the captures for this particular request. +//! +//! Then the capturing process works in this way: +//! +//! - The server receives a query **[1]** +//! - It grabs the HTTP headers and builds a `Capture` object **[2]**, which is configured with the settings +//! denoted by the `X-capture-telemetry` +//! - Now the server tells the `Capturer` to start capturing all the logs and traces occurring on +//! the request **[3]** (denoted by a `trace_id`) The `trace_id` is either carried on the `traceparent` +//! header or implicitly created on the first span of the request. To _start capturing_ implies +//! creating for the `trace_id` in two different data structures: `logs` and `traces`; and storing +//! the settings for selecting the Spans and Event to capture **[4]**. +//! - The server dispatches the request and _Somewhere_ else in the code, it is processed **[5]**. +//! - There the code logs events and emits traces asynchronously, as part of the processing **[6]** +//! - Traces and Logs arrive at the `TRACER`, and get hydrated in the `PROCESSOR` **[7]**, +//! which writes them in the shard corresponding to the current `trace_id`, into the +//! `logs` and `traces` storage **[8]**. The settings previously stored under the `trace_id` +//! key, are used to pick which events and spans are going to be captured based on their level. +//! - When the code that dispatches the request is done it returns a `PrismaResponse` to the +//! server **[9]**. +//! - Then the server asks the `PROCESSOR` to fetch the captures **[10]** +//! - And right after that, it fetches the captures from the `Storage` **[11]**. At that time, although +//! that's not represented in the diagram, the captures are deleted from the storage, thus +//! freeing any memory used for capturing during the request +//! - Finally, the server sets the `logs` and `traces` extensions in the `PrismaResponse`**[12]**, +//! it serializes the extended response in json format and returns it as an HTTP Response +//! blob **[13]**. +//! +pub use self::capturer::Capturer; +pub use self::settings::Settings; +pub use tx_ext::TxTraceExt; + +use self::capturer::Processor; +use once_cell::sync::Lazy; +use opentelemetry::{global, sdk, trace}; +use query_engine_metrics::MetricRegistry; +use tracing::subscriber; +use tracing_subscriber::{ + filter::filter_fn, layer::Layered, prelude::__tracing_subscriber_SubscriberExt, Layer, Registry, +}; + +static PROCESSOR: Lazy = Lazy::new(Processor::default); + +/// Creates a new capturer, which is configured to export traces and log events happening during a +/// particular request +pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer { + Capturer::new(PROCESSOR.to_owned(), trace_id, settings) +} + +/// Adds a capturing layer to the given subscriber and installs the transformed subscriber as the +/// global, default subscriber +#[allow(clippy::type_complexity)] +pub fn install_capturing_layer( + subscriber: Layered, Layered + Send + Sync>, Registry>>, + log_queries: bool, +) { + // set a trace context propagator, so that the trace context is propagated via the + // `traceparent` header from other systems + global::set_text_map_propagator(sdk::propagation::TraceContextPropagator::new()); + // create a tracer provider that is configured to use our custom processor to process spans + let provider = sdk::trace::TracerProvider::builder() + .with_span_processor(PROCESSOR.to_owned()) + .build(); + // create a tracer out of the provider + let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "opentelemetry"); + // set the provider as the global provider + global::set_tracer_provider(provider); + // create a layer that will filter initial events and spans based on the log level configuration + // from the environment and a specific filter to discard things that we are not interested in + // from a capturiong perspective + let telemetry_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(crate::helpers::env_filter( + log_queries, + crate::helpers::QueryEngineLogLevel::FromEnv, + )) + .with_filter(filter_fn(helpers::span_and_event_filter)); + // decorate the given subscriber (more layers were added before this one) with the telemetry layer + let subscriber = subscriber.with(telemetry_layer); + // and finally set the subscriber as the global, default subscriber + subscriber::set_global_default(subscriber).unwrap(); +} + +mod capturer; +mod helpers; +mod settings; +pub mod storage; +mod tx_ext; diff --git a/query-engine/core/src/telemetry/capturing/settings.rs b/query-engine/core/src/telemetry/capturing/settings.rs new file mode 100644 index 000000000000..771586940fc5 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/settings.rs @@ -0,0 +1,85 @@ +use std::{collections::HashSet, time}; + +static VALID_VALUES: &[&str] = &["error", "warn", "info", "debug", "trace", "query", "tracing"]; +static DEFAULT_TTL: time::Duration = time::Duration::from_secs(1800); // 30 minutes + +#[derive(Debug, Clone, Default)] +pub struct Settings { + /// only capture log events from the specified log levels, the special level "query", which does not + /// exist in the engine logging infrastructure, is shimed from any event describing a query, regardless + /// of its level. + pub(super) included_log_levels: HashSet, + /// whether to include trace spans when capturing + pub(super) include_traces: bool, + /// how long to keep captured traces in memory, if by any chance the capturing breaks, a tokio task + /// will clean captured traces after this duration. Defaults to [`DEFAULT_TTL`]. + pub(super) ttl: time::Duration, +} + +impl Settings { + pub(super) fn new(included_log_levels: HashSet, include_traces: bool, ttl: time::Duration) -> Self { + Self { + include_traces, + included_log_levels, + ttl, + } + } +} + +/// As the test below shows, settings can be constructed from a comma separated string. +/// Examples: valid: `"error, warn, query, tracing"` invalid: "foo, bar baz". strings corresponding +/// passed in are trimmed and converted to lowercase. chunks corresponding to levels different from +/// those in VALID_LEVELS are ignored. +/// +/// The ttl is always the same (DEFAULT_TTL) but is there to allow for easier unit-testing of c +/// apturing logic +impl From<&str> for Settings { + fn from(s: &str) -> Self { + let chunks = s.split(','); + let mut set = HashSet::from_iter( + chunks + .into_iter() + .map(str::trim) + .map(str::to_lowercase) + .filter(|s| VALID_VALUES.contains(&s.as_str())), + ); + + let include_traces = set.remove("tracing"); + let included_log_levels = set; + + Self::new(included_log_levels, include_traces, DEFAULT_TTL) + } +} + +impl Settings { + pub fn is_enabled(&self) -> bool { + self.traces_enabled() || self.logs_enabled() + } + + pub fn traces_enabled(&self) -> bool { + self.include_traces + } + + pub fn logs_enabled(&self) -> bool { + !self.included_log_levels.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_options_from() { + let options = Settings::from("error, warn, query, tracing"); + assert_eq!(options.included_log_levels.len(), 3); + assert!(options.included_log_levels.contains("error")); + assert!(options.included_log_levels.contains("warn")); + assert!(options.included_log_levels.contains("query")); + assert!(options.include_traces); + assert!(options.is_enabled()); + + let options = Settings::from("foo, bar baz"); + assert!(!options.is_enabled()); + } +} diff --git a/query-engine/core/src/telemetry/capturing/storage.rs b/query-engine/core/src/telemetry/capturing/storage.rs new file mode 100644 index 000000000000..9c2767169112 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/storage.rs @@ -0,0 +1,19 @@ +use super::settings::Settings; +use crate::telemetry::models; + +#[derive(Debug, Default)] +pub struct Storage { + pub traces: Vec, + pub logs: Vec, + pub settings: Settings, +} + +impl From for Storage { + fn from(settings: Settings) -> Self { + Self { + traces: Default::default(), + logs: Default::default(), + settings, + } + } +} diff --git a/query-engine/core/src/telemetry/capturing/tx_ext.rs b/query-engine/core/src/telemetry/capturing/tx_ext.rs new file mode 100644 index 000000000000..6b1b4905ab57 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/tx_ext.rs @@ -0,0 +1,88 @@ +use std::collections::HashMap; + +pub trait TxTraceExt { + fn into_trace_id(self) -> opentelemetry::trace::TraceId; + fn into_trace_context(self) -> opentelemetry::Context; + fn as_traceparent(&self) -> String; +} + +impl TxTraceExt for crate::TxId { + // in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte, + // (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id. + // this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa" + // + // - first letter is always the same + // - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes + // - next 4 bytes are a counter since the server started + // - next 4 bytes are a system fingerprint, invariant for the same server instance + // - least significative 8 bytes. Totally random. + // + // We want the most entropic slice of 16 bytes that's deterministicly determined + fn into_trace_id(self) -> opentelemetry::trace::TraceId { + let mut buffer = [0; 16]; + let str = self.to_string(); + let tx_id_bytes = str.as_bytes(); + let len = tx_id_bytes.len(); + + // bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter + for (i, source_idx) in (len - 20..len - 12).enumerate() { + buffer[i] = tx_id_bytes[source_idx]; + } + // bytes [len-8 to len): the random blocks + for (i, source_idx) in (len - 8..len).enumerate() { + buffer[i + 8] = tx_id_bytes[source_idx]; + } + + opentelemetry::trace::TraceId::from_bytes(buffer) + } + // This is a bit of a hack, but it's the only way to have a default trace span for a whole + // transaction when no traceparent is propagated from the client. + // + // This is done so we can capture traces happening accross the different queries in a + // transaction. Otherwise, if a traceparent is not propagated from the client, each query in + // the transaction will run within a span that has already been generated at the begining of the + // transaction, and held active in the actor in charge of running the queries. Thus, making + // impossible to capture traces happening in the individual queries, as they won't be aware of + // the transaction they are part of. + // + // By generating this "fake" traceparent based on the transaction id, we can have a common + // trace_id for all transaction operations. + fn into_trace_context(self) -> opentelemetry::Context { + let extractor: HashMap = + HashMap::from_iter(vec![("traceparent".to_string(), self.as_traceparent())]); + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) + } + + fn as_traceparent(&self) -> String { + let trace_id = self.clone().into_trace_id(); + format!("00-{trace_id}-0000000000000001-01") + } +} + +// tests for txid into traits +#[cfg(test)] +mod test { + use super::*; + use crate::TxId; + + #[test] + fn test_txid_into_traceid() { + let fixture = vec![ + ("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"), + // counter changed, trace id changed: + ("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // fingerprint changed, trace id did not change, as that chunk is ignored: + ("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // first 5 bytes changed, trace id did not change, as that chunk is ignored: + ("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp + ("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"), + ]; + + for (txid, expected_trace_id) in fixture { + let txid: TxId = txid.into(); + let trace_id: opentelemetry::trace::TraceId = txid.into_trace_id(); + assert_eq!(trace_id.to_string(), expected_trace_id); + } + } +} diff --git a/query-engine/core/src/telemetry/helpers.rs b/query-engine/core/src/telemetry/helpers.rs index b62d0b72235f..b7ad24811f7c 100644 --- a/query-engine/core/src/telemetry/helpers.rs +++ b/query-engine/core/src/telemetry/helpers.rs @@ -41,11 +41,33 @@ pub fn set_span_link_from_traceparent(span: &Span, traceparent: Option) } } +pub fn get_trace_parent_from_span(span: &Span) -> String { + let cx = span.context(); + let binding = cx.span(); + let span_context = binding.span_context(); + + format!("00-{}-{}-01", span_context.trace_id(), span_context.span_id()) +} + +pub fn get_trace_id_from_span(span: &Span) -> TraceId { + let cx = span.context(); + get_trace_id_from_context(&cx) +} + pub fn get_trace_id_from_context(context: &Context) -> TraceId { let context_span = context.span(); context_span.span_context().trace_id() } +pub fn get_trace_id_from_traceparent(traceparent: Option<&str>) -> TraceId { + traceparent + .unwrap_or("0-0-0-0") + .split('-') + .nth(1) + .map(|id| TraceId::from_hex(id).unwrap_or(TraceId::INVALID)) + .unwrap() +} + pub enum QueryEngineLogLevel { FromEnv, Override(String), @@ -92,10 +114,6 @@ pub fn user_facing_span_only_filter(meta: &Metadata) -> bool { return false; } - user_facing_filter(meta) -} - -pub fn user_facing_filter(meta: &Metadata) -> bool { if *SHOW_ALL_TRACES { return true; } @@ -104,5 +122,7 @@ pub fn user_facing_filter(meta: &Metadata) -> bool { return true; } + // spans describing a quaint query. + // TODO: should this span be made user_facing in quaint? meta.target() == "quaint::connector::metrics" && meta.name() == "quaint:query" } diff --git a/query-engine/core/src/telemetry/mod.rs b/query-engine/core/src/telemetry/mod.rs index 29dd6738caf2..56aab06e731f 100644 --- a/query-engine/core/src/telemetry/mod.rs +++ b/query-engine/core/src/telemetry/mod.rs @@ -1,2 +1,3 @@ +pub mod capturing; pub mod helpers; pub mod models; diff --git a/query-engine/core/src/telemetry/models.rs b/query-engine/core/src/telemetry/models.rs index 07cf755ad2e7..7f6650f78d69 100644 --- a/query-engine/core/src/telemetry/models.rs +++ b/query-engine/core/src/telemetry/models.rs @@ -112,6 +112,7 @@ impl From for TraceSpan { #[derive(Serialize, Debug, Clone, PartialEq, Eq)] pub struct Event { pub(super) span_id: Option, + #[serde(skip_serializing_if = "String::is_empty")] pub(super) name: String, pub(super) level: String, pub(super) timestamp: HrTime, diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 728b7c27a1a6..32d91b725f8b 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -27,6 +27,9 @@ pub struct Logger { enum TracingConfig { // exposed means tracing will be exposed through an HTTP endpoint in a jaeger-compatible format Http(String), + // captured means that traces will be captured in memory and exposed in the graphql response + // logs will be also exposed in the response when capturing is enabled + Captured, // stdout means that traces will be printed to standard output Stdout, // disabled means that tracing will be disabled @@ -59,16 +62,17 @@ impl Logger { self.metrics = Some(metrics); } - pub fn setup_telemetry(&mut self, enable_telemetry: bool, endpoint: &str) { + pub fn setup_telemetry(&mut self, enable_telemetry: bool, enable_capturing: bool, endpoint: &str) { let endpoint = if endpoint.is_empty() { None } else { Some(endpoint.to_owned()) }; - self.tracing_config = match (enable_telemetry, endpoint) { - (true, Some(endpoint)) => TracingConfig::Http(endpoint), - (true, None) => TracingConfig::Stdout, + self.tracing_config = match (enable_telemetry, enable_capturing, endpoint) { + (_, true, _) => TracingConfig::Captured, + (true, _, Some(endpoint)) => TracingConfig::Http(endpoint), + (true, _, None) => TracingConfig::Stdout, _ => TracingConfig::Disabled, }; } @@ -100,6 +104,10 @@ impl Logger { .with(self.metrics.clone()); match self.tracing_config { + TracingConfig::Captured => { + let log_queries = self.log_queries; + telemetry::capturing::install_capturing_layer(subscriber, log_queries) + } TracingConfig::Http(ref endpoint) => { // Opentelemetry is enabled, but capturing is disabled, there's an endpoint to export // the traces to. diff --git a/query-engine/query-engine/src/opt.rs b/query-engine/query-engine/src/opt.rs index 2e5ef81e7c61..09d782ff807e 100644 --- a/query-engine/query-engine/src/opt.rs +++ b/query-engine/query-engine/src/opt.rs @@ -103,6 +103,10 @@ pub struct PrismaOpt { #[structopt(long)] pub enable_open_telemetry: bool, + #[structopt(long)] + /// Enable tracer to capture logs and traces and return in the response + pub enable_telemetry_in_response: bool, + /// The url to the OpenTelemetry collector. /// Enabling this will send the OpenTelemtry tracing to a collector /// and not via our custom stdout tracer diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index 1f688e8c0d35..0269c6238075 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -2,9 +2,13 @@ use crate::state::State; use crate::{opt::PrismaOpt, PrismaResult}; use hyper::service::{make_service_fn, service_fn}; use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode}; -use opentelemetry::global; -use opentelemetry::propagation::Extractor; -use query_core::{schema::QuerySchemaRenderer, TransactionOptions, TxId}; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::{global, propagation::Extractor}; +use query_core::helpers::*; +use query_core::telemetry::capturing::TxTraceExt; +use query_core::{ + schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId, +}; use query_engine_metrics::MetricFormat; use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler}; use serde_json::json; @@ -113,7 +117,8 @@ async fn graphql_handler(state: State, req: Request) -> Result) -> Result) -> Result { let handler = GraphQlHandler::new(&*state.cx.executor, state.cx.query_schema()); - let result = handler.handle(body, tx_id, traceparent).instrument(span).await; + let mut result = handler.handle(body, tx_id, traceparent).instrument(span).await; + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + let telemetry = capturer.fetch_captures().await; + if let Some(telemetry) = telemetry { + result.set_extension("traces".to_owned(), json!(telemetry.traces)); + result.set_extension("logs".to_owned(), json!(telemetry.logs)); + } + } let result_bytes = serde_json::to_vec(&result).unwrap(); @@ -231,11 +284,11 @@ async fn transaction_handler(state: State, req: Request) -> Result) -> Result) -> Result, hyper::Error> { - let cx = get_parent_span_context(req.headers()); + let headers = req.headers().to_owned(); + let body_start = req.into_body(); - // block and buffer request until the request has completed let full_body = hyper::body::to_bytes(body_start).await?; - let tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap(); + let mut tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap(); + let tx_id = tx_opts.with_new_transaction_id(); + // This is the span we use to instrument the execution of a transaction. This span will be open + // during the tx execution, and held in the ITXServer for that transaction (see ITXServer]) let span = info_span!("prisma:engine:itx_runner", user_facing = true, itx_id = field::Empty); - span.set_parent(cx); + + // If telemetry needs to be captured, we use the span trace_id to correlate the logs happening + // during the different operations within a transaction. The trace_id is propagated in the + // traceparent header, but if it's not present, we need to synthetically create one for the + // transaction. This is needed, in case the client is interested in capturing logs and not + // traces, because: + // - The client won't send a traceparent header + // - A transaction initial span is created here (prisma:engine:itx_runner) and stored in the + // ITXServer for that transaction + // - When a query comes in, the graphql handler process it, but we need to tell the capturer + // to start capturing logs, and for that we need a trace_id. There are two places were we + // could get that information from: + // - First, it's the traceparent, but the client didn't send it, because they are only + // interested in logs. + // - Second, it's the root span for the transaction, but it's not in scope but instead + // stored in the ITXServer, in a different tokio task. + // + // For the above reasons, we need to create a trace_id that we can predict and use accross the + // different operations happening within a transaction. So we do it by converting the tx_id + // into a trace_id, leaning on the fact that the tx_id has more entropy, and there's no + // information loss. + let capture_settings = capture_settings(&headers); + let traceparent = traceparent(&headers); + if traceparent.is_none() && capture_settings.logs_enabled() { + span.set_parent(tx_id.into_trace_context()) + } else { + span.set_parent(get_parent_span_context(&headers)) + } + let trace_id = span.context().span().span_context().trace_id(); + let capture_config = telemetry::capturing::capturer(trace_id, capture_settings); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } let result = state .cx @@ -262,9 +351,19 @@ async fn transaction_start_handler(state: State, req: Request) -> Result { - let result = json!({ "id": tx_id.to_string() }); + let result = if let Some(telemetry) = telemetry { + json!({ "id": tx_id.to_string(), "extensions": { "logs": json!(telemetry.logs), "traces": json!(telemetry.traces) } }) + } else { + json!({ "id": tx_id.to_string() }) + }; let result_bytes = serde_json::to_vec(&result).unwrap(); let res = Response::builder() @@ -274,23 +373,57 @@ async fn transaction_start_handler(state: State, req: Request) -> Result Ok(err_to_http_resp(err)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } -async fn transaction_commit_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { +async fn transaction_commit_handler( + state: State, + req: Request, + tx_id: TxId, +) -> Result, hyper::Error> { + let capture_config = capture_config(req.headers(), tx_id.clone()); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } + let result = state.cx.executor.commit_tx(tx_id).await; + + let telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.fetch_captures().await + } else { + None + }; + match result { - Ok(_) => Ok(empty_json_to_http_resp()), - Err(err) => Ok(err_to_http_resp(err)), + Ok(_) => Ok(empty_json_to_http_resp(telemetry)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } -async fn transaction_rollback_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { +async fn transaction_rollback_handler( + state: State, + req: Request, + tx_id: TxId, +) -> Result, hyper::Error> { + let capture_config = capture_config(req.headers(), tx_id.clone()); + + if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.start_capturing().await; + } + let result = state.cx.executor.rollback_tx(tx_id).await; + + let telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = &capture_config { + capturer.fetch_captures().await + } else { + None + }; + match result { - Ok(_) => Ok(empty_json_to_http_resp()), - Err(err) => Ok(err_to_http_resp(err)), + Ok(_) => Ok(empty_json_to_http_resp(telemetry)), + Err(err) => Ok(err_to_http_resp(err, telemetry)), } } @@ -329,8 +462,12 @@ impl<'a> Extractor for HeaderExtractor<'a> { } } -fn empty_json_to_http_resp() -> Response { - let result = json!({}); +fn empty_json_to_http_resp(captured_telemetry: Option) -> Response { + let result = if let Some(telemetry) = captured_telemetry { + json!({ "extensions": { "logs": json!(telemetry.logs), "traces": json!(telemetry.traces) } }) + } else { + json!({}) + }; let result_bytes = serde_json::to_vec(&result).unwrap(); Response::builder() @@ -340,7 +477,10 @@ fn empty_json_to_http_resp() -> Response { .unwrap() } -fn err_to_http_resp(err: query_core::CoreError) -> Response { +fn err_to_http_resp( + err: query_core::CoreError, + captured_telemetry: Option, +) -> Response { let status = match err { query_core::CoreError::TransactionError(ref err) => match err { query_core::TransactionError::AcquisitionTimeout => 504, @@ -354,11 +494,40 @@ fn err_to_http_resp(err: query_core::CoreError) -> Response { _ => 500, }; - let user_error: user_facing_errors::Error = err.into(); - let body = Body::from(serde_json::to_vec(&user_error).unwrap()); + let mut err: ExtendedTransactionUserFacingError = err.into(); + if let Some(telemetry) = captured_telemetry { + err.set_extension("traces".to_owned(), json!(telemetry.traces)); + err.set_extension("logs".to_owned(), json!(telemetry.logs)); + } + let body = Body::from(serde_json::to_vec(&err).unwrap()); Response::builder().status(status).body(body).unwrap() } +fn capture_config(headers: &HeaderMap, tx_id: TxId) -> telemetry::capturing::Capturer { + let capture_settings = capture_settings(headers); + let mut traceparent = traceparent(headers); + + if traceparent.is_none() && capture_settings.is_enabled() { + traceparent = Some(tx_id.as_traceparent()) + } + + let trace_id = get_trace_id_from_traceparent(traceparent.as_deref()); + + telemetry::capturing::capturer(trace_id, capture_settings) +} + +#[allow(clippy::bind_instead_of_map)] +fn capture_settings(headers: &HeaderMap) -> telemetry::capturing::Settings { + const CAPTURE_TELEMETRY_HEADER: &str = "X-capture-telemetry"; + let s = if let Some(hv) = headers.get(CAPTURE_TELEMETRY_HEADER) { + hv.to_str().unwrap_or("") + } else { + "" + }; + + telemetry::capturing::Settings::from(s) +} + fn traceparent(headers: &HeaderMap) -> Option { const TRACEPARENT_HEADER: &str = "traceparent"; diff --git a/query-engine/query-engine/src/state.rs b/query-engine/query-engine/src/state.rs index e380e3f5d05c..892edbd9f548 100644 --- a/query-engine/query-engine/src/state.rs +++ b/query-engine/query-engine/src/state.rs @@ -41,7 +41,11 @@ pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option PrismaResult<()> { subcommand: Some(Subcommand::Cli(CliOpt::Dmmf)), enable_open_telemetry: false, open_telemetry_endpoint: String::new(), + enable_telemetry_in_response: false, dataproxy_metric_override: false, }; diff --git a/query-engine/request-handlers/src/graphql/response.rs b/query-engine/request-handlers/src/graphql/response.rs index 1847e4bc0656..8a6c75622c10 100644 --- a/query-engine/request-handlers/src/graphql/response.rs +++ b/query-engine/request-handlers/src/graphql/response.rs @@ -12,6 +12,9 @@ pub struct GQLResponse { #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, + + #[serde(skip_serializing_if = "IndexMap::is_empty")] + extensions: Map, } #[derive(Debug, serde::Serialize, Default, PartialEq)] @@ -22,6 +25,9 @@ pub struct GQLBatchResponse { #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, + + #[serde(skip_serializing_if = "IndexMap::is_empty")] + extensions: Map, } #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq)] @@ -67,6 +73,10 @@ impl GQLResponse { pub fn errors(&self) -> impl Iterator { self.errors.iter() } + + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } } impl From for GQLResponse { @@ -153,6 +163,10 @@ impl GQLBatchResponse { .iter() .chain(self.batch_result.iter().flat_map(|res| res.errors())) } + + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + self.extensions.entry(key).or_insert(Item::Json(val)); + } } impl From for GQLBatchResponse { diff --git a/query-engine/request-handlers/src/lib.rs b/query-engine/request-handlers/src/lib.rs index 7d370e6f613b..e19be13f0eb8 100644 --- a/query-engine/request-handlers/src/lib.rs +++ b/query-engine/request-handlers/src/lib.rs @@ -16,3 +16,12 @@ pub enum PrismaResponse { Single(GQLResponse), Multi(GQLBatchResponse), } + +impl PrismaResponse { + pub fn set_extension(&mut self, key: String, val: serde_json::Value) { + match self { + Self::Single(r) => r.set_extension(key, val), + Self::Multi(r) => r.set_extension(key, val), + } + } +}