diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs index 6aa02a12548e..8fb806458fd1 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs @@ -52,8 +52,12 @@ impl Actor { pub async fn spawn() -> TestResult { let (log_capture, log_tx) = TestLogCapture::new(); async fn with_logs(fut: impl Future, log_tx: LogEmit) -> T { - fut.with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, setup_metrics(), log_tx)) - .await + fut.with_subscriber(test_tracing_subscriber( + ENV_LOG_LEVEL.to_string(), + setup_metrics(), + log_tx, + )) + .await } let (query_sender, mut query_receiver) = mpsc::channel(100); diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs index 1827f2befdf1..bb5cc2655980 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs @@ -172,7 +172,11 @@ fn run_relation_link_test_impl( teardown_project(&datamodel, Default::default()).await.unwrap(); } - .with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, metrics_for_subscriber, log_tx)), + .with_subscriber(test_tracing_subscriber( + ENV_LOG_LEVEL.to_string(), + metrics_for_subscriber, + log_tx, + )), ); } } @@ -282,7 +286,11 @@ pub fn run_connector_test_impl( crate::teardown_project(&datamodel, db_schemas).await.unwrap(); } - .with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, metrics_for_subscriber, log_tx)), + .with_subscriber(test_tracing_subscriber( + ENV_LOG_LEVEL.to_string(), + metrics_for_subscriber, + log_tx, + )), ); } diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs index 018ffa4f0cbf..1447506a9128 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs @@ -1,6 +1,7 @@ +use query_core::telemetry::helpers as telemetry_helpers; use query_engine_metrics::MetricRegistry; use tracing_error::ErrorLayer; -use tracing_subscriber::{layer::Layered, prelude::*, EnvFilter, Layer, Registry}; +use tracing_subscriber::{layer::Layered, prelude::*, Layer, Registry}; use crate::LogEmit; @@ -27,8 +28,8 @@ type Sub = Layered< >, >; -pub fn test_tracing_subscriber(log_config: &str, metrics: MetricRegistry, log_tx: LogEmit) -> Sub { - let filter = create_env_filter(true, log_config); +pub fn test_tracing_subscriber(log_config: String, metrics: MetricRegistry, log_tx: LogEmit) -> Sub { + let filter = telemetry_helpers::env_filter(true, telemetry_helpers::QueryEngineLogLevel::Override(log_config)); let fmt_layer = tracing_subscriber::fmt::layer() .with_writer(PrintWriter::new(log_tx)) @@ -40,28 +41,6 @@ pub fn test_tracing_subscriber(log_config: &str, metrics: MetricRegistry, log_tx .with(ErrorLayer::default()) } -fn create_env_filter(log_queries: bool, qe_log_level: &str) -> EnvFilter { - let mut filter = EnvFilter::from_default_env() - .add_directive("tide=error".parse().unwrap()) - .add_directive("tonic=error".parse().unwrap()) - .add_directive("h2=error".parse().unwrap()) - .add_directive("hyper=error".parse().unwrap()) - .add_directive("tower=error".parse().unwrap()); - - filter = filter - .add_directive(format!("query_engine={}", &qe_log_level).parse().unwrap()) - .add_directive(format!("query_core={}", &qe_log_level).parse().unwrap()) - .add_directive(format!("query_connector={}", &qe_log_level).parse().unwrap()) - .add_directive(format!("sql_query_connector={}", &qe_log_level).parse().unwrap()) - .add_directive("mongodb_query_connector=debug".parse().unwrap()); - - if log_queries { - filter = filter.add_directive("quaint[{is_query}]=trace".parse().unwrap()); - } - - filter -} - /// This is a temporary implementation detail for `tracing` logs in tests. /// Instead of going through `std::io::stderr`, it goes through the specific /// local stderr handle used by `eprintln` and `dbg`, allowing logs to appear in diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs index 58391e63bbae..faad759a589a 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs @@ -1,7 +1,8 @@ use crate::{ConnectorTag, RunnerInterface, TestError, TestResult, TxResult}; use query_core::{schema::QuerySchemaRef, TxId}; use query_engine::opt::PrismaOpt; -use query_engine::server::{routes, setup, State}; +use query_engine::server::routes; +use query_engine::state::{setup, State}; use query_engine_metrics::MetricRegistry; use request_handlers::{GQLBatchResponse, GQLError, GQLResponse, GraphQlBody, MultiQuery, PrismaResponse}; @@ -20,7 +21,7 @@ pub struct BinaryRunner { impl RunnerInterface for BinaryRunner { async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult { let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]); - let state = setup(&opts, metrics).await.unwrap(); + let state = setup(&opts, false, Some(metrics)).await.unwrap(); let configuration = opts.configuration(true).unwrap(); let data_source = configuration.datasources.first().unwrap(); diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/direct.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/direct.rs index 62022f420488..449189f9cb3b 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/direct.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/direct.rs @@ -1,6 +1,6 @@ use crate::{ConnectorTag, RunnerInterface, TestResult, TxResult}; use colored::Colorize; -use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor, TxId}; +use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor, TransactionOptions, TxId}; use query_engine_metrics::MetricRegistry; use request_handlers::{GraphQlBody, GraphQlHandler, MultiQuery}; use std::{env, sync::Arc}; @@ -82,15 +82,9 @@ impl RunnerInterface for DirectRunner { valid_for_millis: u64, isolation_level: Option, ) -> TestResult { - let id = self - .executor - .start_tx( - self.query_schema.clone(), - max_acquisition_millis, - valid_for_millis, - isolation_level, - ) - .await?; + let tx_opts = TransactionOptions::new(max_acquisition_millis, valid_for_millis, isolation_level); + + let id = self.executor.start_tx(self.query_schema.clone(), &tx_opts).await?; Ok(id) } diff --git a/query-engine/core/src/executor/interpreting_executor.rs b/query-engine/core/src/executor/interpreting_executor.rs index e96b5f3ea413..e00dd8e5dd9e 100644 --- a/query-engine/core/src/executor/interpreting_executor.rs +++ b/query-engine/core/src/executor/interpreting_executor.rs @@ -1,7 +1,7 @@ use super::execute_operation::{execute_many_operations, execute_many_self_contained, execute_single_self_contained}; use crate::{ BatchDocumentTransaction, CoreError, OpenTx, Operation, QueryExecutor, ResponseData, TransactionActorManager, - TransactionError, TransactionManager, TxId, + TransactionError, TransactionManager, TransactionOptions, TxId, }; use async_trait::async_trait; @@ -143,15 +143,14 @@ impl TransactionManager for InterpretingExecutor where C: Connector + Send + Sync, { - async fn start_tx( - &self, - query_schema: QuerySchemaRef, - max_acquisition_millis: u64, - valid_for_millis: u64, - isolation_level: Option, - ) -> crate::Result { + async fn start_tx(&self, query_schema: QuerySchemaRef, tx_opts: &TransactionOptions) -> crate::Result { super::with_request_now(async move { - let id = TxId::default(); + let id = if let Some(predefined_tx_id) = tx_opts.new_tx_id.clone() { + predefined_tx_id.into() + } else { + TxId::default() + }; + trace!("[{}] Starting...", id); let conn_span = info_span!( "prisma:engine:connection", @@ -159,21 +158,21 @@ where "db.type" = self.connector.name() ); let conn = time::timeout( - Duration::from_millis(max_acquisition_millis), + Duration::from_millis(tx_opts.max_acquisition_millis), self.connector.get_connection(), ) .instrument(conn_span) .await; let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??; - let c_tx = OpenTx::start(conn, isolation_level).await?; + let c_tx = OpenTx::start(conn, tx_opts.isolation_level.clone()).await?; self.itx_manager .create_tx( query_schema.clone(), id.clone(), c_tx, - Duration::from_millis(valid_for_millis), + Duration::from_millis(tx_opts.valid_for_millis), ) .await; diff --git a/query-engine/core/src/executor/mod.rs b/query-engine/core/src/executor/mod.rs index f36c687478ee..c8074bf41626 100644 --- a/query-engine/core/src/executor/mod.rs +++ b/query-engine/core/src/executor/mod.rs @@ -14,6 +14,7 @@ mod pipeline; pub use execute_operation::*; pub use loader::*; +use serde::Deserialize; use crate::{ query_document::Operation, response_ir::ResponseData, schema::QuerySchemaRef, BatchDocumentTransaction, TxId, @@ -53,20 +54,51 @@ pub trait QueryExecutor: TransactionManager { fn primary_connector(&self) -> &(dyn Connector + Send + Sync); } +#[derive(Debug, Deserialize)] +pub struct TransactionOptions { + /// Maximum wait time for tx acquisition in milliseconds. + #[serde(rename(deserialize = "max_wait"))] + pub max_acquisition_millis: u64, + + /// Time in milliseconds after which the transaction rolls back automatically. + #[serde(rename(deserialize = "timeout"))] + pub valid_for_millis: u64, + + /// Isolation level to use for the transaction. + pub isolation_level: Option, + + /// An optional pre-defined transaction id. Some value might be provided in case we want to generate + /// a new id at the beginning of the transaction + #[serde(default)] + pub new_tx_id: Option, +} + +impl TransactionOptions { + pub fn new(max_acquisition_millis: u64, valid_for_millis: u64, isolation_level: Option) -> Self { + Self { + max_acquisition_millis, + valid_for_millis, + isolation_level, + new_tx_id: None, + } + } + + /// Generates a new transaction id before the transaction is started and returns a modified version + /// of self with the new predefined_id set. + pub fn with_new_transaction_id(&mut self) -> TxId { + let tx_id: TxId = Default::default(); + self.new_tx_id = Some(tx_id.to_string()); + tx_id + } +} #[async_trait] pub trait TransactionManager { /// Starts a new transaction. /// Returns ID of newly opened transaction. - /// Expected to throw an error if no transaction could be opened for `max_acquisition_millis` milliseconds. - /// The new transaction must only live for `valid_for_millis` milliseconds before it automatically rolls back. + /// Expected to throw an error if no transaction could be opened for `opts.max_acquisition_millis` milliseconds. + /// The new transaction must only live for `opts.valid_for_millis` milliseconds before it automatically rolls back. /// This rollback mechanism is an implementation detail of the trait implementer. - async fn start_tx( - &self, - query_schema: QuerySchemaRef, - max_acquisition_millis: u64, - valid_for_millis: u64, - isolation_level: Option, - ) -> crate::Result; + async fn start_tx(&self, query_schema: QuerySchemaRef, opts: &TransactionOptions) -> crate::Result; /// Commits a transaction. async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()>; diff --git a/query-engine/core/src/interactive_transactions/actor_manager.rs b/query-engine/core/src/interactive_transactions/actor_manager.rs index 7c950e556f64..b34bc8d93db4 100644 --- a/query-engine/core/src/interactive_transactions/actor_manager.rs +++ b/query-engine/core/src/interactive_transactions/actor_manager.rs @@ -114,22 +114,22 @@ impl TransactionActorManager { &self, tx_id: &TxId, operation: Operation, - trace_id: Option, + traceparent: Option, ) -> crate::Result { let client = self.get_client(tx_id, "query").await?; - client.execute(operation, trace_id).await + client.execute(operation, traceparent).await } pub async fn batch_execute( &self, tx_id: &TxId, operations: Vec, - trace_id: Option, + traceparent: Option, ) -> crate::Result>> { let client = self.get_client(tx_id, "batch query").await?; - client.batch_execute(operations, trace_id).await + client.batch_execute(operations, traceparent).await } pub async fn commit_tx(&self, tx_id: &TxId) -> crate::Result<()> { diff --git a/query-engine/core/src/interactive_transactions/actors.rs b/query-engine/core/src/interactive_transactions/actors.rs index b44a78850abd..ebc64a36e253 100644 --- a/query-engine/core/src/interactive_transactions/actors.rs +++ b/query-engine/core/src/interactive_transactions/actors.rs @@ -1,7 +1,7 @@ use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse}; use crate::{ - execute_many_operations, execute_single_operation, set_span_link_from_trace_id, ClosedTx, OpenTx, Operation, - ResponseData, TxId, + execute_many_operations, execute_single_operation, telemetry::helpers::set_span_link_from_traceparent, ClosedTx, + OpenTx, Operation, ResponseData, TxId, }; use schema::QuerySchemaRef; use std::{collections::HashMap, sync::Arc}; @@ -51,13 +51,13 @@ impl ITXServer { // RunState is used to tell if the run loop should continue async fn process_msg(&mut self, op: TxOpRequest) -> RunState { match op.msg { - TxOpRequestMsg::Single(ref operation, trace_id) => { - let result = self.execute_single(&operation, trace_id).await; + TxOpRequestMsg::Single(ref operation, traceparent) => { + let result = self.execute_single(&operation, traceparent).await; let _ = op.respond_to.send(TxOpResponse::Single(result)); RunState::Continue } - TxOpRequestMsg::Batch(ref operations, trace_id) => { - let result = self.execute_batch(&operations, trace_id).await; + TxOpRequestMsg::Batch(ref operations, traceparent) => { + let result = self.execute_batch(&operations, traceparent).await; let _ = op.respond_to.send(TxOpResponse::Batch(result)); RunState::Continue } @@ -74,16 +74,20 @@ impl ITXServer { } } - async fn execute_single(&mut self, operation: &Operation, trace_id: Option) -> crate::Result { + async fn execute_single( + &mut self, + operation: &Operation, + traceparent: Option, + ) -> crate::Result { let span = info_span!("prisma:engine:itx_query_builder", user_facing = true); - set_span_link_from_trace_id(&span, trace_id.clone()); + set_span_link_from_traceparent(&span, traceparent.clone()); let conn = self.cached_tx.as_open()?; execute_single_operation( self.query_schema.clone(), conn.as_connection_like(), operation, - trace_id, + traceparent, ) .instrument(span) .await @@ -92,7 +96,7 @@ impl ITXServer { async fn execute_batch( &mut self, operations: &[Operation], - trace_id: Option, + traceparent: Option, ) -> crate::Result>> { let span = info_span!("prisma:engine:itx_execute", user_facing = true); @@ -101,7 +105,7 @@ impl ITXServer { self.query_schema.clone(), conn.as_connection_like(), operations, - trace_id, + traceparent, ) .instrument(span) .await @@ -168,8 +172,8 @@ impl ITXClient { } } - pub async fn execute(&self, operation: Operation, trace_id: Option) -> crate::Result { - let msg_req = TxOpRequestMsg::Single(operation, trace_id); + pub async fn execute(&self, operation: Operation, traceparent: Option) -> crate::Result { + let msg_req = TxOpRequestMsg::Single(operation, traceparent); let msg = self.send_and_receive(msg_req).await?; if let TxOpResponse::Single(resp) = msg { @@ -182,9 +186,9 @@ impl ITXClient { pub async fn batch_execute( &self, operations: Vec, - trace_id: Option, + traceparent: Option, ) -> crate::Result>> { - let msg_req = TxOpRequestMsg::Batch(operations, trace_id); + let msg_req = TxOpRequestMsg::Batch(operations, traceparent); let msg = self.send_and_receive(msg_req).await?; diff --git a/query-engine/core/src/interactive_transactions/mod.rs b/query-engine/core/src/interactive_transactions/mod.rs index 21566b83c7e0..1f99c1f6f0cb 100644 --- a/query-engine/core/src/interactive_transactions/mod.rs +++ b/query-engine/core/src/interactive_transactions/mod.rs @@ -43,6 +43,8 @@ pub(crate) use messages::*; #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub struct TxId(String); +const MINIMUM_TX_ID_LENGTH: usize = 24; + impl Default for TxId { fn default() -> Self { Self(cuid::cuid().unwrap()) @@ -54,7 +56,15 @@ where T: Into, { fn from(s: T) -> Self { - Self(s.into()) + let contents = s.into(); + assert!( + contents.len() >= MINIMUM_TX_ID_LENGTH, + "minimum length for a TxId ({}) is {}, but was {}", + contents, + MINIMUM_TX_ID_LENGTH, + contents.len() + ); + Self(contents) } } diff --git a/query-engine/core/src/lib.rs b/query-engine/core/src/lib.rs index b42b7b140409..3bf22ee2cfde 100644 --- a/query-engine/core/src/lib.rs +++ b/query-engine/core/src/lib.rs @@ -11,13 +11,14 @@ extern crate tracing; pub mod executor; pub mod query_document; pub mod response_ir; +pub mod telemetry; pub use self::{ error::{CoreError, FieldConversionError}, - executor::QueryExecutor, + executor::{QueryExecutor, TransactionOptions}, interactive_transactions::{TransactionError, TxId}, query_document::*, - trace_helpers::*, + telemetry::*, }; mod error; @@ -27,7 +28,6 @@ mod query_ast; mod query_graph; mod query_graph_builder; mod result_ast; -mod trace_helpers; use self::{ executor::*, diff --git a/query-engine/core/src/telemetry/helpers.rs b/query-engine/core/src/telemetry/helpers.rs new file mode 100644 index 000000000000..e844f2b946e0 --- /dev/null +++ b/query-engine/core/src/telemetry/helpers.rs @@ -0,0 +1,112 @@ +use super::models::TraceSpan; +use once_cell::sync::Lazy; +use opentelemetry::sdk::export::trace::SpanData; +use opentelemetry::trace::{TraceContextExt, TraceId}; +use opentelemetry::Context; +use serde_json::{json, Value}; +use std::collections::HashMap; +use tracing::{Metadata, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_subscriber::EnvFilter; + +pub static SHOW_ALL_TRACES: Lazy = Lazy::new(|| match std::env::var("PRISMA_SHOW_ALL_TRACES") { + Ok(enabled) => enabled.to_lowercase() == *("true"), + Err(_) => false, +}); + +pub fn spans_to_json(spans: Vec) -> String { + let json_spans: Vec = spans.into_iter().map(|span| json!(TraceSpan::from(span))).collect(); + let span_result = json!({ + "span": true, + "spans": json_spans + }); + + match serde_json::to_string(&span_result) { + Ok(json_string) => json_string, + Err(_) => "".to_string(), + } +} + +// set the parent context and return the traceparent +pub fn set_parent_context_from_json_str(span: &Span, trace: &str) -> Option { + let trace: HashMap = serde_json::from_str(trace).unwrap_or_default(); + let trace_id = trace.get("traceparent").map(String::from); + let cx = opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&trace)); + span.set_parent(cx); + trace_id +} + +pub fn set_span_link_from_traceparent(span: &Span, traceparent: Option) { + if let Some(traceparent) = traceparent { + let trace: HashMap = HashMap::from([("traceparent".to_string(), traceparent)]); + let cx = opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&trace)); + let context_span = cx.span(); + span.add_link(context_span.span_context().clone()); + } +} + +pub fn get_trace_id_from_context(context: &Context) -> TraceId { + let context_span = context.span(); + context_span.span_context().trace_id() +} + +pub enum QueryEngineLogLevel { + FromEnv, + Override(String), +} + +impl QueryEngineLogLevel { + fn level(self) -> Option { + match self { + Self::FromEnv => std::env::var("QE_LOG_LEVEL").ok(), + Self::Override(l) => Some(l), + } + } +} + +#[rustfmt::skip] +pub fn env_filter(log_queries: bool, qe_log_level: QueryEngineLogLevel) -> EnvFilter { + let mut filter = EnvFilter::from_default_env() + .add_directive("tide=error".parse().unwrap()) + .add_directive("tonic=error".parse().unwrap()) + .add_directive("h2=error".parse().unwrap()) + .add_directive("hyper=error".parse().unwrap()) + .add_directive("tower=error".parse().unwrap()); + + if let Some(level) = qe_log_level.level() { + filter = filter + .add_directive(format!("query_engine={}", &level).parse().unwrap()) + .add_directive(format!("query_core={}", &level).parse().unwrap()) + .add_directive(format!("query_connector={}", &level).parse().unwrap()) + .add_directive(format!("sql_query_connector={}", &level).parse().unwrap()) + .add_directive(format!("mongodb_query_connector={}", &level).parse().unwrap()); + } + + if log_queries { + filter = filter + .add_directive("quaint[{is_query}]=trace".parse().unwrap()) + .add_directive("mongodb_query_connector=debug".parse().unwrap()); + } + + filter +} + +pub fn user_facing_span_only_filter(meta: &Metadata) -> bool { + if !meta.is_span() { + return false; + } + + user_facing_filter(meta) +} + +pub fn user_facing_filter(meta: &Metadata) -> bool { + if *SHOW_ALL_TRACES { + return true; + } + + if meta.fields().iter().any(|f| f.name() == "user_facing") { + return true; + } + + meta.target() == "quaint::connector::metrics" && meta.name() == "quaint:query" +} diff --git a/query-engine/core/src/telemetry/mod.rs b/query-engine/core/src/telemetry/mod.rs new file mode 100644 index 000000000000..29dd6738caf2 --- /dev/null +++ b/query-engine/core/src/telemetry/mod.rs @@ -0,0 +1,2 @@ +pub mod helpers; +pub mod models; diff --git a/query-engine/core/src/telemetry/models.rs b/query-engine/core/src/telemetry/models.rs new file mode 100644 index 000000000000..07cf755ad2e7 --- /dev/null +++ b/query-engine/core/src/telemetry/models.rs @@ -0,0 +1,209 @@ +use opentelemetry::{sdk::export::trace::SpanData, KeyValue, Value}; +use serde::Serialize; +use serde_json::json; +use std::{ + borrow::Cow, + collections::HashMap, + time::{Duration, SystemTime}, +}; + +const ACCEPT_ATTRIBUTES: &[&str] = &["db.statement", "itx_id", "db.type"]; + +#[derive(Serialize, Debug, Clone, PartialEq, Eq)] +pub struct TraceSpan { + pub(super) trace_id: String, + pub(super) span_id: String, + pub(super) parent_span_id: String, + pub(super) name: String, + pub(super) start_time: HrTime, + pub(super) end_time: HrTime, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub(super) attributes: HashMap, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub(super) events: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub(super) links: Vec, +} + +#[derive(Serialize, Debug, Clone, PartialEq, Eq)] +pub struct Link { + trace_id: String, + span_id: String, +} + +impl TraceSpan { + pub fn split_events(self) -> (Vec, TraceSpan) { + (self.events, Self { events: vec![], ..self }) + } +} + +impl From for TraceSpan { + fn from(span: SpanData) -> Self { + let attributes: HashMap = + span.attributes + .iter() + .fold(HashMap::default(), |mut map, (key, value)| { + if ACCEPT_ATTRIBUTES.contains(&key.as_str()) { + map.insert(key.to_string(), to_json_value(value)); + } + + map + }); + + // TODO(fernandez@prisma.io) mongo query events and quaint query events + // have different attributes. both of them are queries, however the name + // of quaint queries is quaint::query and the name of mongodb queries is + // prisma::engine::db_query. Both of them are generated as spans but quaint + // contains the full query, while mongodb only contains the collection name + // and the operatiion. For this reason, we treat them differently when geneating + // query events in logging capturing and other places. + // + // What we are currently doing is to add a quaint attribute to quaint queries + // so we can transform span containing the query into a query event. For mongo + // this is not enough and we need to extract a particular event. + // + // If we unified these two ways of logging / tracing query information, we could get rid of + // a lot of spaghetti code. + + let is_quaint_query = matches!(span.name, Cow::Borrowed("quaint:query")); + + let name: Cow = if is_quaint_query { + "prisma:engine:db_query".into() + } else { + span.name.clone() + }; + + let hr_start_time = convert_to_high_res_time(span.start_time.duration_since(SystemTime::UNIX_EPOCH).unwrap()); + let hr_end_time = convert_to_high_res_time(span.end_time.duration_since(SystemTime::UNIX_EPOCH).unwrap()); + + let links = span + .links + .iter() + .map(|link| { + let ctx = link.span_context(); + Link { + trace_id: ctx.trace_id().to_string(), + span_id: ctx.span_id().to_string(), + } + }) + .collect(); + + let span_id = span.span_context.span_id().to_string(); + let events = span + .events + .into_iter() + .map(|e| Event::from(e).with_span_id(span_id.clone())) + .collect(); + + Self { + span_id, + trace_id: span.span_context.trace_id().to_string(), + parent_span_id: span.parent_span_id.to_string(), + name: name.into_owned(), + start_time: hr_start_time, + end_time: hr_end_time, + attributes, + links, + events, + } + } +} + +#[derive(Serialize, Debug, Clone, PartialEq, Eq)] +pub struct Event { + pub(super) span_id: Option, + pub(super) name: String, + pub(super) level: String, + pub(super) timestamp: HrTime, + pub(super) attributes: HashMap, +} + +impl Event { + pub(super) fn with_span_id(mut self, spain_id: String) -> Self { + self.span_id = Some(spain_id); + self + } +} + +impl From for Event { + fn from(event: opentelemetry::trace::Event) -> Self { + let name = event.name.to_string(); + let timestamp = convert_to_high_res_time(event.timestamp.duration_since(SystemTime::UNIX_EPOCH).unwrap()); + let mut attributes: HashMap = + event + .attributes + .into_iter() + .fold(Default::default(), |mut map, KeyValue { key, value }| { + map.insert(key.to_string(), to_json_value(&value)); + map + }); + + let level = attributes + .remove("level") + .unwrap_or_else(|| serde_json::Value::String("unknown".to_owned())) + .to_string() + .to_ascii_lowercase(); + + Self { + span_id: None, // already attached to the span + name, + level, + timestamp, + attributes, + } + } +} +/// logs are modeled as span events +pub type LogEvent = Event; +/// metrics are modeled as span events +pub type MetricEvent = Event; + +pub type HrTime = [u64; 2]; + +/// Take from the otel library on what the format should be for High-Resolution time +/// Defines High-Resolution Time. +/// +/// The first number, HrTime[0], is UNIX Epoch time in seconds since 00:00:00 UTC on 1 January 1970. +/// The second number, HrTime[1], represents the partial second elapsed since Unix Epoch time represented by first number in nanoseconds. +/// For example, 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds is represented as 1609504210150. +/// The first number is calculated by converting and truncating the Epoch time in milliseconds to seconds: +/// HrTime[0] = Math.trunc(1609504210150 / 1000) = 1609504210. +/// The second number is calculated by converting the digits after the decimal point of the subtraction, (1609504210150 / 1000) - HrTime[0], to nanoseconds: +/// HrTime[1] = Number((1609504210.150 - HrTime[0]).toFixed(9)) * 1e9 = 150000000. +/// This is represented in HrTime format as [1609504210, 150000000]. +fn convert_to_high_res_time(time: Duration) -> HrTime { + let secs = time.as_secs(); + let partial = time.subsec_nanos(); + [secs, partial as u64] +} + +/// Transforms an [`opentelemetry::Value`] to a [`serde_json::Value`] +/// This is because we want to flatten the JSON representation for a value, which by default will +/// be a nested structure informing of the type. For instance a float [`opentelemetry::Value`] +/// would be represented as json as `{"f64": 1.0}`. This function will flatten it to just `1.0`. +fn to_json_value(value: &Value) -> serde_json::Value { + match value { + Value::String(s) => json!(s), + Value::F64(f) => json!(f), + Value::Bool(b) => json!(b), + Value::I64(i) => json!(i), + Value::Array(ary) => match ary { + opentelemetry::Array::Bool(b_vec) => json!(b_vec), + opentelemetry::Array::I64(i_vec) => json!(i_vec), + opentelemetry::Array::F64(f_vec) => json!(f_vec), + opentelemetry::Array::String(s_vec) => json!(s_vec), + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_high_resolution_time_works() { + // 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds + let time_val = Duration::from_millis(1609504210150); + assert_eq!([1609504210, 150000000], convert_to_high_res_time(time_val)); + } +} diff --git a/query-engine/core/src/trace_helpers/mod.rs b/query-engine/core/src/trace_helpers/mod.rs deleted file mode 100644 index ed6d88e14ffb..000000000000 --- a/query-engine/core/src/trace_helpers/mod.rs +++ /dev/null @@ -1,146 +0,0 @@ -use once_cell::sync::Lazy; -use opentelemetry::sdk::export::trace::SpanData; -use opentelemetry::trace::TraceContextExt; -use serde_json::{json, Value}; -use std::borrow::Cow; - -use std::time::Duration; -use std::{collections::HashMap, time::SystemTime}; -use tracing::{Metadata, Span}; -use tracing_opentelemetry::OpenTelemetrySpanExt; - -const ACCEPT_ATTRIBUTES: &[&str] = &["db.statement", "itx_id", "db.type"]; - -pub static SHOW_ALL_TRACES: Lazy = Lazy::new(|| match std::env::var("PRISMA_SHOW_ALL_TRACES") { - Ok(enabled) => enabled.to_lowercase() == *("true"), - Err(_) => false, -}); - -pub fn spans_to_json(spans: &[SpanData]) -> String { - let json_spans: Vec = spans.iter().map(span_to_json).collect(); - let span_result = json!({ - "span": true, - "spans": json_spans - }); - - match serde_json::to_string(&span_result) { - Ok(json_string) => json_string, - Err(_) => "".to_string(), - } -} - -fn span_to_json(span: &SpanData) -> Value { - let attributes: HashMap = - span.attributes - .iter() - .fold(HashMap::default(), |mut map, (key, value)| { - if ACCEPT_ATTRIBUTES.contains(&key.as_str()) { - map.insert(key.to_string(), value.to_string()); - } - - map - }); - - // Override the name of quaint. It will be confusing for users to see quaint instead of - // Prisma in the spans. - let name: Cow = match span.name { - Cow::Borrowed("quaint:query") => "prisma:engine:db_query".into(), - _ => span.name.clone(), - }; - - let hr_start_time = convert_to_high_res_time(span.start_time.duration_since(SystemTime::UNIX_EPOCH).unwrap()); - let hr_end_time = convert_to_high_res_time(span.end_time.duration_since(SystemTime::UNIX_EPOCH).unwrap()); - - json!({ - "span": true, - "trace_id": span.span_context.trace_id().to_string(), - "span_id": span.span_context.span_id().to_string(), - "parent_span_id": span.parent_span_id.to_string(), - "name": name, - "start_time": hr_start_time, - "end_time": hr_end_time, - "attributes": attributes, - "links": create_link_json(span) - }) -} - -fn create_link_json(span: &SpanData) -> Vec { - span.links - .iter() - .map(|link| { - let ctx = link.span_context(); - json!({ - "trace_id": ctx.trace_id().to_string(), - "span_id": ctx.span_id().to_string(), - }) - }) - .collect() -} - -// set the parent context and return the traceparent -pub fn set_parent_context_from_json_str(span: &Span, trace: &str) -> Option { - let trace: HashMap = serde_json::from_str(trace).unwrap_or_default(); - let trace_id = trace.get("traceparent").map(String::from); - let cx = opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&trace)); - span.set_parent(cx); - trace_id -} - -pub fn set_span_link_from_trace_id(span: &Span, trace_id: Option) { - if let Some(trace_id) = trace_id { - let trace: HashMap = HashMap::from([("traceparent".to_string(), trace_id)]); - let cx = opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&trace)); - let context_span = cx.span(); - span.add_link(context_span.span_context().clone()); - } -} - -pub fn is_user_facing_trace_filter(meta: &Metadata) -> bool { - if !meta.is_span() { - return false; - } - - if *SHOW_ALL_TRACES { - return true; - } - - if meta.fields().iter().any(|f| f.name() == "user_facing") { - return true; - } - - meta.target() == "quaint::connector::metrics" && meta.name() == "quaint:query" -} - -/** - * Take from the otel library on what the format should be for High-Resolution time - * Defines High-Resolution Time. - * - * The first number, HrTime[0], is UNIX Epoch time in seconds since 00:00:00 UTC on 1 January 1970. - * The second number, HrTime[1], represents the partial second elapsed since Unix Epoch time represented by first number in nanoseconds. - * For example, 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds is represented as 1609504210150. - * The first number is calculated by converting and truncating the Epoch time in milliseconds to seconds: - * HrTime[0] = Math.trunc(1609504210150 / 1000) = 1609504210. - * The second number is calculated by converting the digits after the decimal point of the subtraction, (1609504210150 / 1000) - HrTime[0], to nanoseconds: - * HrTime[1] = Number((1609504210.150 - HrTime[0]).toFixed(9)) * 1e9 = 150000000. - * This is represented in HrTime format as [1609504210, 150000000]. - */ -type HrTime = [u64; 2]; -pub fn convert_to_high_res_time(time: Duration) -> HrTime { - let secs = time.as_secs(); - let partial = time.subsec_nanos(); - [secs, partial as u64] -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - - #[test] - fn test_high_resolution_time_works() { - // 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds - let time_val = Duration::from_millis(1609504210150); - assert_eq!([1609504210, 150000000], convert_to_high_res_time(time_val)); - } -} diff --git a/query-engine/query-engine-node-api/src/engine.rs b/query-engine/query-engine-node-api/src/engine.rs index 01a568b75fd2..20ce9ae58be8 100644 --- a/query-engine/query-engine-node-api/src/engine.rs +++ b/query-engine/query-engine-node-api/src/engine.rs @@ -1,13 +1,13 @@ -use crate::{error::ApiError, log_callback::LogCallback, logger::Logger}; +use crate::{engine::executor::TransactionOptions, error::ApiError, log_callback::LogCallback, logger::Logger}; use futures::FutureExt; use psl::PreviewFeature; use query_core::{ executor, schema::{QuerySchema, QuerySchemaRenderer}, - schema_builder, set_parent_context_from_json_str, QueryExecutor, TxId, + schema_builder, telemetry, QueryExecutor, TxId, }; use query_engine_metrics::{MetricFormat, MetricRegistry}; -use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler, TxInput}; +use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::{ @@ -206,7 +206,7 @@ impl QueryEngine { async_panic_to_js_error(async { let span = tracing::info_span!("prisma:engine:connect"); - let _ = set_parent_context_from_json_str(&span, &trace); + let _ = telemetry::helpers::set_parent_context_from_json_str(&span, &trace); let mut inner = self.inner.write().await; let builder = inner.as_builder()?; @@ -264,7 +264,7 @@ impl QueryEngine { async_panic_to_js_error(async { let span = tracing::info_span!("prisma:engine:disconnect"); - let _ = set_parent_context_from_json_str(&span, &trace); + let _ = telemetry::helpers::set_parent_context_from_json_str(&span, &trace); async { let mut inner = self.inner.write().await; @@ -305,7 +305,7 @@ impl QueryEngine { Span::none() }; - let trace_id = set_parent_context_from_json_str(&span, &trace); + let trace_id = telemetry::helpers::set_parent_context_from_json_str(&span, &trace); let handler = GraphQlHandler::new(engine.executor(), engine.query_schema()); let response = handler @@ -332,17 +332,12 @@ impl QueryEngine { async move { let span = tracing::info_span!("prisma:engine:itx_runner", user_facing = true, itx_id = field::Empty); - set_parent_context_from_json_str(&span, &trace); + telemetry::helpers::set_parent_context_from_json_str(&span, &trace); - let input: TxInput = serde_json::from_str(&input)?; + let tx_opts: TransactionOptions = serde_json::from_str(&input)?; match engine .executor() - .start_tx( - engine.query_schema().clone(), - input.max_wait, - input.timeout, - input.isolation_level, - ) + .start_tx(engine.query_schema().clone(), &tx_opts) .instrument(span) .await { diff --git a/query-engine/query-engine-node-api/src/logger.rs b/query-engine/query-engine-node-api/src/logger.rs index a01ee4e4d3fb..383df1fef22c 100644 --- a/query-engine/query-engine-node-api/src/logger.rs +++ b/query-engine/query-engine-node-api/src/logger.rs @@ -1,5 +1,5 @@ use core::fmt; -use query_core::is_user_facing_trace_filter; +use query_core::telemetry; use query_engine_metrics::MetricRegistry; use serde_json::Value; use std::collections::BTreeMap; @@ -31,24 +31,19 @@ impl Logger { enable_metrics: bool, enable_tracing: bool, ) -> Self { - let is_sql_query = filter_fn(|meta| { - meta.target() == "quaint::connector::metrics" && meta.fields().iter().any(|f| f.name() == "query") - }); - - // is a mongodb query? - let is_mongo_query = filter_fn(|meta| meta.target() == "mongodb_query_connector::query"); + let is_user_facing = filter_fn(telemetry::helpers::user_facing_filter); // We need to filter the messages to send to our callback logging mechanism let filters = if log_queries { // Filter trace query events (for query log) or based in the defined log level - is_sql_query.or(is_mongo_query).or(log_level).boxed() + is_user_facing.or(log_level).boxed() } else { // Filter based in the defined log level FilterExt::boxed(log_level) }; let log_callback_arc = Arc::new(log_callback); - let is_user_trace = filter_fn(is_user_facing_trace_filter); + let is_user_trace = filter_fn(telemetry::helpers::user_facing_span_only_filter); let tracer = crate::tracer::new_pipeline().install_simple(Arc::clone(&log_callback_arc)); let telemetry = if enable_tracing { let telemetry = tracing_opentelemetry::layer() diff --git a/query-engine/query-engine-node-api/src/tracer.rs b/query-engine/query-engine-node-api/src/tracer.rs index fa4cd5d60623..cae99253a44b 100644 --- a/query-engine/query-engine-node-api/src/tracer.rs +++ b/query-engine/query-engine-node-api/src/tracer.rs @@ -7,6 +7,7 @@ use opentelemetry::{ }, trace::{TraceError, TracerProvider}, }; +use query_core::telemetry; use std::{ fmt::{self, Debug}, sync::Arc, @@ -83,7 +84,7 @@ impl Debug for ClientSpanExporter { impl SpanExporter for ClientSpanExporter { /// Export spans to stdout async fn export(&mut self, batch: Vec) -> ExportResult { - let result = query_core::spans_to_json(&batch); + let result = telemetry::helpers::spans_to_json(batch); self.callback .call(result) .map_err(|err| TraceError::from(format!("Could not call JS callback: {}", &err.reason))) diff --git a/query-engine/query-engine/src/context.rs b/query-engine/query-engine/src/context.rs index 6bb6c51b9cdc..42b00765bfb0 100644 --- a/query-engine/query-engine/src/context.rs +++ b/query-engine/query-engine/src/context.rs @@ -8,6 +8,7 @@ use std::{env, fmt, sync::Arc}; pub struct PrismaContext { /// The api query schema. query_schema: QuerySchemaRef, + /// The metrics registry pub metrics: MetricRegistry, /// Central query executor. pub executor: Box, @@ -43,7 +44,7 @@ impl ContextBuilder { impl PrismaContext { /// Initializes a new Prisma context. - async fn new( + pub async fn new( schema: psl::ValidatedSchema, enable_raw_queries: bool, metrics: MetricRegistry, diff --git a/query-engine/query-engine/src/lib.rs b/query-engine/query-engine/src/lib.rs index a587c03868d2..95c8446ebacb 100644 --- a/query-engine/query-engine/src/lib.rs +++ b/query-engine/query-engine/src/lib.rs @@ -6,6 +6,7 @@ pub mod error; pub mod logger; pub mod opt; pub mod server; +pub mod state; pub mod tracer; use error::PrismaError; diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 595677f7d2da..d1ec5f36ce6b 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -1,17 +1,12 @@ use opentelemetry::{ - global, - sdk::{ - propagation::TraceContextPropagator, - trace::{Config, Tracer}, - Resource, - }, + sdk::{trace::Config, Resource}, KeyValue, }; use opentelemetry_otlp::WithExportConfig; -use query_core::is_user_facing_trace_filter; +use query_core::telemetry; use query_engine_metrics::MetricRegistry; use tracing::{dispatcher::SetGlobalDefaultError, subscriber}; -use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, EnvFilter, Layer}; +use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, Layer}; use crate::LogFormat; @@ -19,25 +14,34 @@ type LoggerResult = Result; /// An installer for a global logger. #[derive(Debug, Clone)] -pub struct Logger<'a> { +pub struct Logger { service_name: &'static str, log_format: LogFormat, - enable_telemetry: bool, log_queries: bool, - telemetry_endpoint: Option<&'a str>, + tracing_config: TracingConfig, metrics: Option, } -impl<'a> Logger<'a> { +// TracingConfig specifies how tracing will be exposed by the logger facility +#[derive(Debug, Clone)] +enum TracingConfig { + // exposed means tracing will be exposed through an HTTP endpoint in a jaeger-compatible format + Exposed(String), + // stdout means that traces will be printed to standard output + Stdout, + // disabled means that tracing will be disabled + Disabled, +} + +impl Logger { /// Initialize a new global logger installer. pub fn new(service_name: &'static str) -> Self { Self { service_name, log_format: LogFormat::Json, - enable_telemetry: false, log_queries: false, - telemetry_endpoint: None, metrics: None, + tracing_config: TracingConfig::Disabled, } } @@ -51,40 +55,34 @@ impl<'a> Logger<'a> { self.log_queries = log_queries; } - /// Enables Jaeger telemetry. - pub fn enable_telemetry(&mut self, enable_telemetry: bool) { - self.enable_telemetry = enable_telemetry; + pub fn enable_metrics(&mut self, metrics: MetricRegistry) { + self.metrics = Some(metrics); } - /// Sets a custom telemetry endpoint - pub fn telemetry_endpoint(&mut self, endpoint: &'a str) { - if endpoint.is_empty() { - self.telemetry_endpoint = None + pub fn setup_telemetry(&mut self, enable_telemetry: bool, endpoint: &str) { + let endpoint = if endpoint.is_empty() { + None } else { - self.telemetry_endpoint = Some(endpoint); - } + Some(endpoint.to_owned()) + }; + + self.tracing_config = match (enable_telemetry, endpoint) { + (true, Some(endpoint)) => TracingConfig::Exposed(endpoint), + (true, None) => TracingConfig::Stdout, + _ => TracingConfig::Disabled, + }; } - pub fn enable_metrics(&mut self, metrics: MetricRegistry) { - self.metrics = Some(metrics); + pub fn is_metrics_enabled(&self) -> bool { + self.metrics.is_some() } /// Install logger as a global. Can be called only once per application /// instance. The returned guard value needs to stay in scope for the whole /// lifetime of the service. - pub fn install(self) -> LoggerResult<()> { - let filter = create_env_filter(self.log_queries); - - let is_user_trace = filter_fn(is_user_facing_trace_filter); - - let telemetry = if self.enable_telemetry { - let tracer = create_otel_tracer(self.service_name, self.telemetry_endpoint); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - let telemetry = telemetry.with_filter(is_user_trace); - Some(telemetry) - } else { - None - }; + pub fn install(&self) -> LoggerResult<()> { + let filter = telemetry::helpers::env_filter(self.log_queries, telemetry::helpers::QueryEngineLogLevel::FromEnv); + let is_user_trace = filter_fn(telemetry::helpers::user_facing_span_only_filter); let fmt_layer = match self.log_format { LogFormat::Text => { @@ -99,54 +97,39 @@ impl<'a> Logger<'a> { let subscriber = tracing_subscriber::registry() .with(fmt_layer) - .with(self.metrics) - .with(telemetry); - - subscriber::set_global_default(subscriber)?; + .with(self.metrics.clone()); + + match self.tracing_config { + TracingConfig::Exposed(ref endpoint) => { + // Opentelemetry is enabled, but capturing is disabled, there's an endpoint to export + // the traces to. + let resource = Resource::new(vec![KeyValue::new("service.name", self.service_name)]); + let config = Config::default().with_resource(resource); + let builder = opentelemetry_otlp::new_pipeline().tracing().with_trace_config(config); + let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); + let tracer = builder.with_exporter(exporter).install_simple().unwrap(); + let telemetry_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(is_user_trace); + let subscriber = subscriber.with(telemetry_layer); + subscriber::set_global_default(subscriber)?; + } + TracingConfig::Stdout => { + // Opentelemetry is enabled, but capturing is disabled, and there's no endpoint to + // export traces too. We export it to stdout + let exporter = crate::tracer::ClientSpanExporter::default(); + let tracer = crate::tracer::install(Some(exporter), None); + let telemetry_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(is_user_trace); + let subscriber = subscriber.with(telemetry_layer); + subscriber::set_global_default(subscriber)?; + } + TracingConfig::Disabled => { + subscriber::set_global_default(subscriber)?; + } + } Ok(()) } } - -fn create_otel_tracer(service_name: &'static str, collector_endpoint: Option<&str>) -> Tracer { - global::set_text_map_propagator(TraceContextPropagator::new()); - - if let Some(endpoint) = collector_endpoint { - // A special parameter for Jaeger to set the service name in spans. - let resource = Resource::new(vec![KeyValue::new("service.name", service_name)]); - let config = Config::default().with_resource(resource); - - let builder = opentelemetry_otlp::new_pipeline().tracing().with_trace_config(config); - let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); - builder.with_exporter(exporter).install_simple().unwrap() - } else { - crate::tracer::new_pipeline().install_simple() - } -} - -fn create_env_filter(log_queries: bool) -> EnvFilter { - let mut filter = EnvFilter::from_default_env() - .add_directive("tide=error".parse().unwrap()) - .add_directive("tonic=error".parse().unwrap()) - .add_directive("h2=error".parse().unwrap()) - .add_directive("hyper=error".parse().unwrap()) - .add_directive("tower=error".parse().unwrap()); - - if let Ok(qe_log_level) = std::env::var("QE_LOG_LEVEL") { - filter = filter - .add_directive(format!("query_engine={}", &qe_log_level).parse().unwrap()) - .add_directive(format!("query_core={}", &qe_log_level).parse().unwrap()) - .add_directive(format!("query_connector={}", &qe_log_level).parse().unwrap()) - .add_directive(format!("sql_query_connector={}", &qe_log_level).parse().unwrap()) - .add_directive(format!("mongodb_query_connector={}", &qe_log_level).parse().unwrap()); - } - - if log_queries { - // even when mongo queries are logged in debug mode, we want to log them if the log level is higher - filter = filter - .add_directive("quaint[{is_query}]=trace".parse().unwrap()) - .add_directive("mongodb_query_connector=debug".parse().unwrap()); - } - - filter -} diff --git a/query-engine/query-engine/src/main.rs b/query-engine/query-engine/src/main.rs index 1d4912161c07..f71035b37a5b 100644 --- a/query-engine/query-engine/src/main.rs +++ b/query-engine/query-engine/src/main.rs @@ -5,13 +5,13 @@ extern crate tracing; use query_engine::cli::CliCommand; use query_engine::error::PrismaError; -use query_engine::logger::Logger; use query_engine::opt::PrismaOpt; use query_engine::server; +use query_engine::state; use query_engine::LogFormat; -use query_engine_metrics::MetricRegistry; use std::{error::Error, process}; use structopt::StructOpt; +use tracing::Instrument; type AnyError = Box; @@ -26,26 +26,13 @@ async fn main() -> Result<(), AnyError> { async fn main() -> Result<(), PrismaError> { let opts = PrismaOpt::from_args(); - let metrics = MetricRegistry::new(); - - let mut logger = Logger::new("prisma-engine-http"); - logger.log_format(opts.log_format()); - logger.log_queries(opts.log_queries()); - logger.enable_telemetry(opts.enable_open_telemetry); - logger.telemetry_endpoint(&opts.open_telemetry_endpoint); - logger.enable_metrics(metrics.clone()); - - logger.install().unwrap(); - - if opts.enable_metrics || opts.dataproxy_metric_override { - query_engine_metrics::setup(); - } - match CliCommand::from_opt(&opts)? { Some(cmd) => cmd.execute().await?, None => { + let span = tracing::info_span!("prisma:engine:connect"); + let state = state::setup(&opts, true, None).instrument(span).await?; set_panic_hook(opts.log_format()); - server::listen(opts, metrics).await?; + server::listen(&opts, state).await?; } } diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index 9e266441c5be..a955452c4ae8 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -1,104 +1,20 @@ -use crate::{context::PrismaContext, opt::PrismaOpt, PrismaResult}; +use crate::state::State; +use crate::{opt::PrismaOpt, PrismaResult}; use hyper::service::{make_service_fn, service_fn}; use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode}; -use opentelemetry::{global, propagation::Extractor, Context}; -use psl::PreviewFeature; -use query_core::schema::QuerySchemaRef; -use query_core::{schema::QuerySchemaRenderer, TxId}; -use query_engine_metrics::{MetricFormat, MetricRegistry}; -use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler, TxInput}; +use opentelemetry::propagation::Extractor; +use query_core::{schema::QuerySchemaRenderer, TransactionOptions, TxId}; +use query_engine_metrics::MetricFormat; +use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler}; use serde_json::json; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; use tracing::{field, Instrument, Span}; -use tracing_opentelemetry::OpenTelemetrySpanExt; - -const TRANSACTION_ID_HEADER: &str = "X-transaction-id"; - -//// Shared application state. -pub struct State { - cx: Arc, - enable_playground: bool, - enable_debug_mode: bool, - enable_metrics: bool, -} - -impl State { - /// Create a new instance of `State`. - fn new(cx: PrismaContext) -> Self { - Self { - cx: Arc::new(cx), - enable_playground: false, - enable_debug_mode: false, - enable_metrics: false, - } - } - - pub fn enable_playground(mut self, enable: bool) -> Self { - self.enable_playground = enable; - self - } - - pub fn enable_debug_mode(mut self, enable: bool) -> Self { - self.enable_debug_mode = enable; - self - } - - pub fn enable_metrics(mut self, enable: bool) -> Self { - self.enable_metrics = enable; - self - } - - pub fn get_metrics(&self) -> MetricRegistry { - self.cx.metrics.clone() - } - - pub fn query_schema(&self) -> &QuerySchemaRef { - self.cx.query_schema() - } -} - -impl Clone for State { - fn clone(&self) -> Self { - Self { - cx: self.cx.clone(), - enable_playground: self.enable_playground, - enable_debug_mode: self.enable_debug_mode, - enable_metrics: self.enable_metrics, - } - } -} - -pub async fn setup(opts: &PrismaOpt, metrics: MetricRegistry) -> PrismaResult { - let datamodel = opts.schema(false)?; - let config = &datamodel.configuration; - config.validate_that_one_datasource_is_provided()?; - - let span = tracing::info_span!("prisma:engine:connect"); - - let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics) || opts.dataproxy_metric_override; - - let cx = PrismaContext::builder(datamodel) - .set_metrics(metrics) - .enable_raw_queries(opts.enable_raw_queries) - .build() - .instrument(span) - .await?; - - let state = State::new(cx) - .enable_playground(opts.enable_playground) - .enable_debug_mode(opts.enable_debug_mode) - .enable_metrics(enable_metrics); - - Ok(state) -} /// Starts up the graphql query engine server -pub async fn listen(opts: PrismaOpt, metrics: MetricRegistry) -> PrismaResult<()> { - let state = setup(&opts, metrics).await?; - +pub async fn listen(opts: &PrismaOpt, state: State) -> PrismaResult<()> { let query_engine = make_service_fn(move |_| { let state = state.clone(); async move { Ok::<_, hyper::Error>(service_fn(move |req| routes(state.clone(), req))) } @@ -121,15 +37,15 @@ pub async fn listen(opts: PrismaOpt, metrics: MetricRegistry) -> PrismaResult<() pub async fn routes(state: State, req: Request) -> Result, hyper::Error> { let start = Instant::now(); - if req.method() == Method::POST && req.uri().path().contains("transaction") { - return handle_transaction(state, req).await; + if req.method() == Method::POST && req.uri().path().starts_with("/transaction") { + return transaction_handler(state, req).await; } if [Method::POST, Method::GET].contains(req.method()) - && req.uri().path().contains("metrics") + && req.uri().path().starts_with("/metrics") && state.enable_metrics { - return handle_metrics(state, req).await; + return metrics_handler(state, req).await; } let mut res = match (req.method(), req.uri().path()) { @@ -194,24 +110,11 @@ async fn graphql_handler(state: State, req: Request) -> Result { - let s = traceparent.to_str().unwrap_or_default().to_string(); - Some(s) - } - _ => None, - }; + let traceparent = traceparent(headers); + let tx_id = transaction_id(headers); let work = async move { let body_start = req.into_body(); @@ -221,7 +124,7 @@ async fn graphql_handler(state: State, req: Request) -> Result { let handler = GraphQlHandler::new(&*state.cx.executor, state.cx.query_schema()); - let result = handler.handle(body, tx_id, trace_id).instrument(span).await; + let result = handler.handle(body, tx_id, traceparent).instrument(span).await; let result_bytes = serde_json::to_vec(&result).unwrap(); @@ -263,7 +166,7 @@ fn playground_handler() -> Response { .unwrap() } -async fn handle_metrics(state: State, req: Request) -> Result, hyper::Error> { +async fn metrics_handler(state: State, req: Request) -> Result, hyper::Error> { let format = if let Some(query) = req.uri().query() { if query.contains("format=json") { MetricFormat::Json @@ -310,72 +213,48 @@ async fn handle_metrics(state: State, req: Request) -> Result start a transaction /// POST /transaction/{tx_id}/commit -> commit a transaction /// POST /transaction/{tx_id}/rollback -> rollback a transaction -async fn handle_transaction(state: State, req: Request) -> Result, hyper::Error> { - let path = req.uri().path(); +async fn transaction_handler(state: State, req: Request) -> Result, hyper::Error> { + let path = req.uri().path().to_owned(); + let sections: Vec<&str> = path.split('/').collect(); + let span = info_span!("prisma:engine:itx_runner", user_facing = true, itx_id = field::Empty); - if path.contains("start") { - return transaction_start_handler(state, req).await; + if sections.len() == 3 && sections[2] == "start" { + return transaction_start_handler(state, req).instrument(span).await; } - let sections: Vec<&str> = path.split('/').collect(); - - if sections.len() < 2 { - return Ok(Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from("Request does not contain the transaction id")) - .unwrap()); + if sections.len() == 4 && sections[3] == "commit" { + return transaction_commit_handler(state, sections[2].into()) + .instrument(span) + .await; } - let tx_id: TxId = sections[2].into(); + if sections.len() == 4 && sections[3] == "rollback" { + return transaction_rollback_handler(state, sections[2].into()) + .instrument(span) + .await; + } - let succuss_resp = Response::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, "application/json") - .body(Body::from(r#"{}"#)) + let res = Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("wrong transaction handler path: {}", &path))) .unwrap(); - - if path.contains("commit") { - match state.cx.executor.commit_tx(tx_id).await { - Ok(_) => Ok(succuss_resp), - Err(err) => Ok(err_to_http_resp(err)), - } - } else if path.contains("rollback") { - match state.cx.executor.rollback_tx(tx_id).await { - Ok(_) => Ok(succuss_resp), - Err(err) => Ok(err_to_http_resp(err)), - } - } else { - let res = Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::empty()) - .unwrap(); - Ok(res) - } + Ok(res) } async fn transaction_start_handler(state: State, req: Request) -> Result, hyper::Error> { - let cx = get_parent_span_context(&req); - let body_start = req.into_body(); // block and buffer request until the request has completed let full_body = hyper::body::to_bytes(body_start).await?; - let input: TxInput = serde_json::from_slice(full_body.as_ref()).unwrap(); - - let span = tracing::info_span!("prisma:engine:itx_runner", user_facing = true, itx_id = field::Empty); - span.set_parent(cx); + let tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap(); - match state + let result = state .cx .executor - .start_tx( - state.cx.query_schema().clone(), - input.max_wait, - input.timeout, - input.isolation_level, - ) - .instrument(span) - .await - { + .start_tx(state.cx.query_schema().clone(), &tx_opts) + .instrument(Span::current()) + .await; + + match result { Ok(tx_id) => { let result = json!({ "id": tx_id.to_string() }); let result_bytes = serde_json::to_vec(&result).unwrap(); @@ -385,22 +264,25 @@ async fn transaction_start_handler(state: State, req: Request) -> Result Ok(err_to_http_resp(err)), } } -fn get_transaction_id_from_header(req: &Request) -> Option { - match req.headers().get(TRANSACTION_ID_HEADER) { - Some(id_header) => { - let msg = format!("{} has not been correctly set.", TRANSACTION_ID_HEADER); - let id = id_header.to_str().unwrap_or(msg.as_str()); - Some(TxId::from(id)) - } +async fn transaction_commit_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { + let result = state.cx.executor.commit_tx(tx_id).await; + match result { + Ok(_) => Ok(empty_json_to_http_resp()), + Err(err) => Ok(err_to_http_resp(err)), + } +} - None => None, +async fn transaction_rollback_handler(state: State, tx_id: TxId) -> Result, hyper::Error> { + let result = state.cx.executor.rollback_tx(tx_id).await; + match result { + Ok(_) => Ok(empty_json_to_http_resp()), + Err(err) => Ok(err_to_http_resp(err)), } } @@ -439,11 +321,15 @@ impl<'a> Extractor for HeaderExtractor<'a> { } } -/// If the client sends us a trace and span id, extracting a new context if the -/// headers are set. If not, returns current context. -fn get_parent_span_context(req: &Request) -> Context { - let extractor = HeaderExtractor(req.headers()); - global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) +fn empty_json_to_http_resp() -> Response { + let result = json!({}); + let result_bytes = serde_json::to_vec(&result).unwrap(); + + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/json") + .body(Body::from(result_bytes)) + .unwrap() } fn err_to_http_resp(err: query_core::CoreError) -> Response { @@ -462,6 +348,30 @@ fn err_to_http_resp(err: query_core::CoreError) -> Response { let user_error: user_facing_errors::Error = err.into(); let body = Body::from(serde_json::to_vec(&user_error).unwrap()); - Response::builder().status(status).body(body).unwrap() } + +fn traceparent(headers: &HeaderMap) -> Option { + const TRACEPARENT_HEADER: &str = "traceparent"; + + let value = headers + .get(TRACEPARENT_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_owned()); + + let is_valid_traceparent = |s: &String| s.split_terminator('-').count() >= 4; + + match &value { + Some(str) if is_valid_traceparent(str) => value, + _ => None, + } +} + +#[allow(clippy::bind_instead_of_map)] +fn transaction_id(headers: &HeaderMap) -> Option { + const TRANSACTION_ID_HEADER: &str = "X-transaction-id"; + headers + .get(TRANSACTION_ID_HEADER) + .and_then(|h| h.to_str().ok()) + .and_then(|s| Some(TxId::from(s))) +} diff --git a/query-engine/query-engine/src/state.rs b/query-engine/query-engine/src/state.rs new file mode 100644 index 000000000000..9e0d88991812 --- /dev/null +++ b/query-engine/query-engine/src/state.rs @@ -0,0 +1,84 @@ +use crate::{context::PrismaContext, logger::Logger, opt::PrismaOpt, PrismaResult}; +use psl::PreviewFeature; +use query_core::schema::QuerySchemaRef; +use query_engine_metrics::{setup as metric_setup, MetricRegistry}; +use std::sync::Arc; +use tracing::Instrument; + +//// Shared application state. +pub struct State { + pub cx: Arc, + pub enable_playground: bool, + pub enable_debug_mode: bool, + pub enable_metrics: bool, +} + +impl State { + /// Create a new instance of `State`. + fn new(cx: PrismaContext, enable_playground: bool, enable_debug_mode: bool, enable_metrics: bool) -> Self { + Self { + cx: Arc::new(cx), + enable_playground, + enable_debug_mode, + enable_metrics, + } + } + + pub fn get_metrics(&self) -> MetricRegistry { + self.cx.metrics.clone() + } + + pub fn query_schema(&self) -> &QuerySchemaRef { + self.cx.query_schema() + } +} + +impl Clone for State { + fn clone(&self) -> Self { + Self { + cx: self.cx.clone(), + enable_playground: self.enable_playground, + enable_debug_mode: self.enable_debug_mode, + enable_metrics: self.enable_metrics, + } + } +} + +pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option) -> PrismaResult { + let metrics = if metrics.is_none() { + MetricRegistry::new() + } else { + metrics.unwrap() + }; + + let mut logger = Logger::new("prisma-engine-http"); + logger.log_format(opts.log_format()); + logger.log_queries(opts.log_queries()); + logger.enable_metrics(metrics.clone()); + logger.setup_telemetry(opts.enable_open_telemetry, &opts.open_telemetry_endpoint); + + if install_logger { + logger.install().unwrap(); + } + + if opts.enable_metrics || opts.dataproxy_metric_override { + metric_setup(); + } + + let datamodel = opts.schema(false)?; + let config = &datamodel.configuration; + config.validate_that_one_datasource_is_provided()?; + + let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics) || opts.dataproxy_metric_override; + let span = tracing::info_span!("prisma:engine:connect"); + + let cx = PrismaContext::builder(datamodel) // opts.enable_raw_queries, metrics, logs_capture) + .set_metrics(metrics) + .enable_raw_queries(opts.enable_raw_queries) + .build() + .instrument(span) + .await?; + + let state = State::new(cx, opts.enable_playground, opts.enable_debug_mode, enable_metrics); + Ok(state) +} diff --git a/query-engine/query-engine/src/tests/dmmf.rs b/query-engine/query-engine/src/tests/dmmf.rs index c64280123575..1599fa12dfa3 100644 --- a/query-engine/query-engine/src/tests/dmmf.rs +++ b/query-engine/query-engine/src/tests/dmmf.rs @@ -94,6 +94,7 @@ fn test_dmmf_cli_command(schema: &str) -> PrismaResult<()> { subcommand: Some(Subcommand::Cli(CliOpt::Dmmf)), enable_open_telemetry: false, open_telemetry_endpoint: String::new(), + enable_telemetry_in_response: false, dataproxy_metric_override: false, }; diff --git a/query-engine/query-engine/src/tracer.rs b/query-engine/query-engine/src/tracer.rs index 95da9b79484c..8763ba892f4f 100644 --- a/query-engine/query-engine/src/tracer.rs +++ b/query-engine/query-engine/src/tracer.rs @@ -8,51 +8,29 @@ use opentelemetry::{ }, trace::TracerProvider, }; -use query_core::spans_to_json; +use query_core::telemetry; use std::io::{stdout, Stdout}; use std::{fmt::Debug, io::Write}; -/// Pipeline builder -#[derive(Debug)] -pub struct PipelineBuilder { - trace_config: Option, -} - -/// Create a new stdout exporter pipeline builder. -pub fn new_pipeline() -> PipelineBuilder { - PipelineBuilder::default() -} +pub fn install(exporter: Option, mut tracer_config: Option) -> sdk::trace::Tracer +where + E: SpanExporter + 'static, +{ + global::set_text_map_propagator(TraceContextPropagator::new()); + let mut provider_builder = sdk::trace::TracerProvider::builder(); -impl Default for PipelineBuilder { - /// Return the default pipeline builder. - fn default() -> Self { - Self { trace_config: None } + if let Some(exporter) = exporter { + provider_builder = provider_builder.with_simple_exporter(exporter); } -} -impl PipelineBuilder { - /// Assign the SDK trace configuration. - pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { - self.trace_config = Some(config); - self + if let Some(config) = tracer_config.take() { + provider_builder = provider_builder.with_config(config); } -} - -impl PipelineBuilder { - pub fn install_simple(mut self) -> sdk::trace::Tracer { - global::set_text_map_propagator(TraceContextPropagator::new()); - let exporter = ClientSpanExporter::new(); + let provider = provider_builder.build(); + let tracer = provider.tracer("opentelemetry"); + global::set_tracer_provider(provider); - let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter); - if let Some(config) = self.trace_config.take() { - provider_builder = provider_builder.with_config(config); - } - let provider = provider_builder.build(); - let tracer = provider.tracer("opentelemetry"); - global::set_tracer_provider(provider); - - tracer - } + tracer } /// A [`ClientSpanExporter`] that sends spans to stdout. @@ -76,7 +54,7 @@ impl Default for ClientSpanExporter { #[async_trait] impl SpanExporter for ClientSpanExporter { async fn export(&mut self, batch: Vec) -> ExportResult { - let result = spans_to_json(&batch); + let result = telemetry::helpers::spans_to_json(batch); if let Err(err) = writeln!(self.writer, "{result}") { Err(ClientSpanExporterError(err).into()) diff --git a/query-engine/request-handlers/src/lib.rs b/query-engine/request-handlers/src/lib.rs index 19e3fe4cc6a4..79f9bcab50d9 100644 --- a/query-engine/request-handlers/src/lib.rs +++ b/query-engine/request-handlers/src/lib.rs @@ -4,14 +4,12 @@ pub mod dmmf; mod error; mod graphql; -mod transactions; #[cfg(test)] mod tests; pub use error::HandlerError; pub use graphql::*; -pub use transactions::*; pub type Result = std::result::Result; @@ -21,12 +19,3 @@ pub enum PrismaResponse { Single(GQLResponse), Multi(GQLBatchResponse), } - -impl PrismaResponse { - pub fn errors(&self) -> Vec<&GQLError> { - match self { - PrismaResponse::Single(ref s) => s.errors().collect(), - PrismaResponse::Multi(ref m) => m.errors().collect(), - } - } -} diff --git a/query-engine/request-handlers/src/transactions/mod.rs b/query-engine/request-handlers/src/transactions/mod.rs deleted file mode 100644 index 9461240b1f30..000000000000 --- a/query-engine/request-handlers/src/transactions/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -use serde::Deserialize; - -#[derive(Debug, Deserialize)] -pub struct TxInput { - /// Maximum wait time in milliseconds. - pub max_wait: u64, - - /// Time in milliseconds after which the transaction rolls back automatically. - pub timeout: u64, - - /// Isolation level to use for the transaction. - pub isolation_level: Option, -}