diff --git a/Makefile b/Makefile index 489de641bac6..5954a3587a2e 100644 --- a/Makefile +++ b/Makefile @@ -207,7 +207,7 @@ validate: cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma qe: - cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry + cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-logs-in-response qe-dmmf: cargo run --bin query-engine -- cli dmmf > dmmf.json diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs index a5ba77959ae1..8c0b37526e28 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs @@ -2,7 +2,8 @@ use crate::{ConnectorTag, RunnerInterface, TestError, TestResult, TxResult}; use hyper::{Body, Method, Request, Response}; use query_core::{schema::QuerySchemaRef, TxId}; use query_engine::opt::PrismaOpt; -use query_engine::server::{routes, setup, State}; +use query_engine::server::routes; +use query_engine::state::{init_state, State}; use query_engine_metrics::MetricRegistry; use request_handlers::{GQLBatchResponse, GQLError, GQLResponse, GraphQlBody, MultiQuery, PrismaResponse}; @@ -16,7 +17,7 @@ pub struct BinaryRunner { impl RunnerInterface for BinaryRunner { async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult { let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]); - let state = setup(&opts, metrics).await.unwrap(); + let state = init_state(&opts, false, Some(metrics)).await.unwrap(); Ok(BinaryRunner { state, diff --git a/query-engine/core/src/trace_helpers/mod.rs b/query-engine/core/src/trace_helpers/mod.rs index ed6d88e14ffb..b35b7df77659 100644 --- a/query-engine/core/src/trace_helpers/mod.rs +++ b/query-engine/core/src/trace_helpers/mod.rs @@ -1,6 +1,7 @@ use once_cell::sync::Lazy; use opentelemetry::sdk::export::trace::SpanData; -use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::{TraceContextExt, TraceId}; +use opentelemetry::Context; use serde_json::{json, Value}; use std::borrow::Cow; @@ -95,6 +96,11 @@ pub fn set_span_link_from_trace_id(span: &Span, trace_id: Option) { } } +pub fn get_trace_id_from_context(context: &Context) -> TraceId { + let context_span = context.span(); + context_span.span_context().trace_id() +} + pub fn is_user_facing_trace_filter(meta: &Metadata) -> bool { if !meta.is_span() { return false; diff --git a/query-engine/query-engine/src/capture_tracer.rs b/query-engine/query-engine/src/capture_tracer.rs new file mode 100644 index 000000000000..964c4599e9d2 --- /dev/null +++ b/query-engine/query-engine/src/capture_tracer.rs @@ -0,0 +1,108 @@ +use async_trait::async_trait; +use opentelemetry::{ + global, + sdk::{ + self, + export::trace::{ExportResult, SpanData, SpanExporter}, + propagation::TraceContextPropagator, + }, + trace::{TraceId, TracerProvider}, +}; +use query_core::spans_to_json; +use std::fmt::Debug; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; + +/// Pipeline builder +#[derive(Debug)] +pub struct PipelineBuilder { + trace_config: Option, +} + +/// Create a new stdout exporter pipeline builder. +pub fn new_pipeline() -> PipelineBuilder { + PipelineBuilder::default() +} + +impl Default for PipelineBuilder { + /// Return the default pipeline builder. + fn default() -> Self { + Self { trace_config: None } + } +} + +impl PipelineBuilder { + /// Assign the SDK trace configuration. + pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { + self.trace_config = Some(config); + self + } +} + +impl PipelineBuilder { + pub fn install_simple(mut self, exporter: CaptureExporter) -> sdk::trace::Tracer { + global::set_text_map_propagator(TraceContextPropagator::new()); + + let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter); + if let Some(config) = self.trace_config.take() { + provider_builder = provider_builder.with_config(config); + } + let provider = provider_builder.build(); + let tracer = provider.tracer("opentelemetry"); + global::set_tracer_provider(provider); + + tracer + } +} + +/// A [`CaptureExporter`] that sends spans to stdout. +#[derive(Debug, Clone)] +pub struct CaptureExporter { + logs: Arc>>>, +} + +impl CaptureExporter { + pub fn new() -> Self { + Self { + logs: Default::default(), + } + } + + pub async fn capture(&self, trace_id: TraceId) { + let mut logs = self.logs.lock().await; + logs.insert(trace_id, Vec::new()); + } + + pub async fn get(&self, trace_id: TraceId) -> String { + let mut logs = self.logs.lock().await; + if let Some(spans) = logs.remove(&trace_id) { + spans_to_json(&spans) + } else { + String::new() + } + } +} + +impl Default for CaptureExporter { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl SpanExporter for CaptureExporter { + async fn export(&mut self, batch: Vec) -> ExportResult { + let batch = batch.into_iter().filter(|span| span.name == "quaint:query"); + + let mut logs = self.logs.lock().await; + for span in batch { + let trace_id = span.span_context.trace_id(); + + if let Some(spans) = logs.get_mut(&trace_id) { + spans.push(span) + } + } + + Ok(()) + } +} diff --git a/query-engine/query-engine/src/context.rs b/query-engine/query-engine/src/context.rs index 6b79ff947418..5368be0db020 100644 --- a/query-engine/query-engine/src/context.rs +++ b/query-engine/query-engine/src/context.rs @@ -1,7 +1,10 @@ -use crate::{PrismaError, PrismaResult}; +use crate::{capture_tracer::CaptureExporter, PrismaError, PrismaResult}; use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor}; use query_engine_metrics::MetricRegistry; -use std::{env, fmt, sync::Arc}; +use std::{ + env, fmt, + sync::{atomic::AtomicUsize, Arc}, +}; /// Prisma request context containing all immutable state of the process. /// There is usually only one context initialized per process. @@ -11,6 +14,8 @@ pub struct PrismaContext { pub metrics: MetricRegistry, /// Central query executor. pub executor: Box, + pub inflight_tracer: Option, + pub counter: AtomicUsize, } impl fmt::Debug for PrismaContext { @@ -23,6 +28,7 @@ pub struct ContextBuilder { enable_raw_queries: bool, schema: psl::ValidatedSchema, metrics: Option, + logs_capture_tracer: Option, } impl ContextBuilder { @@ -36,17 +42,29 @@ impl ContextBuilder { self } + pub fn set_logs_capture_tracer(mut self, tracer: Option) -> Self { + self.logs_capture_tracer = tracer; + self + } + pub async fn build(self) -> PrismaResult { - PrismaContext::new(self.schema, self.enable_raw_queries, self.metrics.unwrap_or_default()).await + PrismaContext::new( + self.schema, + self.enable_raw_queries, + self.metrics.unwrap_or_default(), + self.logs_capture_tracer, + ) + .await } } impl PrismaContext { /// Initializes a new Prisma context. - async fn new( + pub async fn new( schema: psl::ValidatedSchema, enable_raw_queries: bool, metrics: MetricRegistry, + inflight_tracer: Option, ) -> PrismaResult { let config = &schema.configuration; // We only support one data source at the moment, so take the first one (default not exposed yet). @@ -70,6 +88,8 @@ impl PrismaContext { query_schema, executor, metrics, + inflight_tracer, + counter: AtomicUsize::new(0), }; context.verify_connection().await?; @@ -87,6 +107,7 @@ impl PrismaContext { enable_raw_queries: false, schema, metrics: None, + logs_capture_tracer: None, } } diff --git a/query-engine/query-engine/src/lib.rs b/query-engine/query-engine/src/lib.rs index a587c03868d2..8939025d0a1e 100644 --- a/query-engine/query-engine/src/lib.rs +++ b/query-engine/query-engine/src/lib.rs @@ -1,11 +1,13 @@ #![allow(clippy::derive_partial_eq_without_eq)] +pub mod capture_tracer; pub mod cli; pub mod context; pub mod error; pub mod logger; pub mod opt; pub mod server; +pub mod state; pub mod tracer; use error::PrismaError; diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 595677f7d2da..3654c86d1fe3 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -1,7 +1,5 @@ use opentelemetry::{ - global, sdk::{ - propagation::TraceContextPropagator, trace::{Config, Tracer}, Resource, }, @@ -13,7 +11,7 @@ use query_engine_metrics::MetricRegistry; use tracing::{dispatcher::SetGlobalDefaultError, subscriber}; use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, EnvFilter, Layer}; -use crate::LogFormat; +use crate::{capture_tracer::CaptureExporter, LogFormat}; type LoggerResult = Result; @@ -26,6 +24,7 @@ pub struct Logger<'a> { log_queries: bool, telemetry_endpoint: Option<&'a str>, metrics: Option, + log_capture_exporter: Option, } impl<'a> Logger<'a> { @@ -38,6 +37,7 @@ impl<'a> Logger<'a> { log_queries: false, telemetry_endpoint: None, metrics: None, + log_capture_exporter: None, } } @@ -69,6 +69,12 @@ impl<'a> Logger<'a> { self.metrics = Some(metrics); } + pub fn enable_logs_capture(&mut self) -> CaptureExporter { + let capture = CaptureExporter::new(); + self.log_capture_exporter = Some(capture.clone()); + capture + } + /// Install logger as a global. Can be called only once per application /// instance. The returned guard value needs to stay in scope for the whole /// lifetime of the service. @@ -79,7 +85,13 @@ impl<'a> Logger<'a> { let telemetry = if self.enable_telemetry { let tracer = create_otel_tracer(self.service_name, self.telemetry_endpoint); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let mut telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + if let Some(log_capture_exporter) = self.log_capture_exporter { + let tracer = crate::capture_tracer::new_pipeline().install_simple(log_capture_exporter); + telemetry = telemetry.with_tracer(tracer); + } + let telemetry = telemetry.with_filter(is_user_trace); Some(telemetry) } else { @@ -109,8 +121,6 @@ impl<'a> Logger<'a> { } fn create_otel_tracer(service_name: &'static str, collector_endpoint: Option<&str>) -> Tracer { - global::set_text_map_propagator(TraceContextPropagator::new()); - if let Some(endpoint) = collector_endpoint { // A special parameter for Jaeger to set the service name in spans. let resource = Resource::new(vec![KeyValue::new("service.name", service_name)]); diff --git a/query-engine/query-engine/src/main.rs b/query-engine/query-engine/src/main.rs index 1d4912161c07..f7f8df7ca7d6 100644 --- a/query-engine/query-engine/src/main.rs +++ b/query-engine/query-engine/src/main.rs @@ -5,13 +5,13 @@ extern crate tracing; use query_engine::cli::CliCommand; use query_engine::error::PrismaError; -use query_engine::logger::Logger; use query_engine::opt::PrismaOpt; use query_engine::server; +use query_engine::state::init_state; use query_engine::LogFormat; -use query_engine_metrics::MetricRegistry; use std::{error::Error, process}; use structopt::StructOpt; +use tracing::Instrument; type AnyError = Box; @@ -26,26 +26,13 @@ async fn main() -> Result<(), AnyError> { async fn main() -> Result<(), PrismaError> { let opts = PrismaOpt::from_args(); - let metrics = MetricRegistry::new(); - - let mut logger = Logger::new("prisma-engine-http"); - logger.log_format(opts.log_format()); - logger.log_queries(opts.log_queries()); - logger.enable_telemetry(opts.enable_open_telemetry); - logger.telemetry_endpoint(&opts.open_telemetry_endpoint); - logger.enable_metrics(metrics.clone()); - - logger.install().unwrap(); - - if opts.enable_metrics || opts.dataproxy_metric_override { - query_engine_metrics::setup(); - } - match CliCommand::from_opt(&opts)? { Some(cmd) => cmd.execute().await?, None => { + let span = tracing::info_span!("prisma:engine:connect"); + let state = init_state(&opts, true, None).instrument(span).await?; set_panic_hook(opts.log_format()); - server::listen(opts, metrics).await?; + server::listen(&opts, state).await?; } } diff --git a/query-engine/query-engine/src/opt.rs b/query-engine/query-engine/src/opt.rs index a42a8e31b31f..182cbbc515e2 100644 --- a/query-engine/query-engine/src/opt.rs +++ b/query-engine/query-engine/src/opt.rs @@ -103,6 +103,10 @@ pub struct PrismaOpt { #[structopt(long)] pub enable_open_telemetry: bool, + #[structopt(long)] + /// Enable tracer to capture logs and return in the response + pub enable_logs_in_response: bool, + /// The url to the OpenTelemetry collector. /// Enabling this will send the OpenTelemtry tracing to a collector /// and not via our custom stdout tracer diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index 9e266441c5be..b84cc27c94e8 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -1,11 +1,13 @@ -use crate::{context::PrismaContext, opt::PrismaOpt, PrismaResult}; +use crate::state::State; +use crate::{opt::PrismaOpt, PrismaResult}; use hyper::service::{make_service_fn, service_fn}; use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode}; +use opentelemetry::trace::TraceId; use opentelemetry::{global, propagation::Extractor, Context}; -use psl::PreviewFeature; -use query_core::schema::QuerySchemaRef; + +use query_core::get_trace_id_from_context; use query_core::{schema::QuerySchemaRenderer, TxId}; -use query_engine_metrics::{MetricFormat, MetricRegistry}; +use query_engine_metrics::MetricFormat; use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler, TxInput}; use serde_json::json; use std::collections::HashMap; @@ -17,88 +19,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; const TRANSACTION_ID_HEADER: &str = "X-transaction-id"; -//// Shared application state. -pub struct State { - cx: Arc, - enable_playground: bool, - enable_debug_mode: bool, - enable_metrics: bool, -} - -impl State { - /// Create a new instance of `State`. - fn new(cx: PrismaContext) -> Self { - Self { - cx: Arc::new(cx), - enable_playground: false, - enable_debug_mode: false, - enable_metrics: false, - } - } - - pub fn enable_playground(mut self, enable: bool) -> Self { - self.enable_playground = enable; - self - } - - pub fn enable_debug_mode(mut self, enable: bool) -> Self { - self.enable_debug_mode = enable; - self - } - - pub fn enable_metrics(mut self, enable: bool) -> Self { - self.enable_metrics = enable; - self - } - - pub fn get_metrics(&self) -> MetricRegistry { - self.cx.metrics.clone() - } - - pub fn query_schema(&self) -> &QuerySchemaRef { - self.cx.query_schema() - } -} - -impl Clone for State { - fn clone(&self) -> Self { - Self { - cx: self.cx.clone(), - enable_playground: self.enable_playground, - enable_debug_mode: self.enable_debug_mode, - enable_metrics: self.enable_metrics, - } - } -} - -pub async fn setup(opts: &PrismaOpt, metrics: MetricRegistry) -> PrismaResult { - let datamodel = opts.schema(false)?; - let config = &datamodel.configuration; - config.validate_that_one_datasource_is_provided()?; - - let span = tracing::info_span!("prisma:engine:connect"); - - let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics) || opts.dataproxy_metric_override; - - let cx = PrismaContext::builder(datamodel) - .set_metrics(metrics) - .enable_raw_queries(opts.enable_raw_queries) - .build() - .instrument(span) - .await?; - - let state = State::new(cx) - .enable_playground(opts.enable_playground) - .enable_debug_mode(opts.enable_debug_mode) - .enable_metrics(enable_metrics); - - Ok(state) -} - /// Starts up the graphql query engine server -pub async fn listen(opts: PrismaOpt, metrics: MetricRegistry) -> PrismaResult<()> { - let state = setup(&opts, metrics).await?; - +pub async fn listen(opts: &PrismaOpt, state: State) -> PrismaResult<()> { let query_engine = make_service_fn(move |_| { let state = state.clone(); async move { Ok::<_, hyper::Error>(service_fn(move |req| routes(state.clone(), req))) } @@ -194,24 +116,17 @@ async fn graphql_handler(state: State, req: Request) -> Result { - let s = traceparent.to_str().unwrap_or_default().to_string(); - Some(s) - } - _ => None, - }; + if log_capture.should_capture() { + state + .cx + .inflight_tracer + .as_ref() + .unwrap() + .capture(log_capture.id()) + .await; + } let work = async move { let body_start = req.into_body(); @@ -221,9 +136,18 @@ async fn graphql_handler(state: State, req: Request) -> Result { let handler = GraphQlHandler::new(&*state.cx.executor, state.cx.query_schema()); - let result = handler.handle(body, tx_id, trace_id).instrument(span).await; - - let result_bytes = serde_json::to_vec(&result).unwrap(); + let result = handler.handle(body, tx_id, trace_id.clone()).instrument(span).await; + + let result_bytes = if log_capture.should_capture() { + let logs = state.cx.inflight_tracer.as_ref().unwrap().get(log_capture.id()).await; + let json = json!({ + "result": result, + "logs": logs + }); + serde_json::to_vec(&json).unwrap() + } else { + serde_json::to_vec(&result).unwrap() + }; let res = Response::builder() .status(StatusCode::OK) @@ -465,3 +389,76 @@ fn err_to_http_resp(err: query_core::CoreError) -> Response { Response::builder().status(status).body(body).unwrap() } + +struct LogCapture { + id: TraceId, + capture: bool, +} + +impl LogCapture { + fn new(id: TraceId, capture: bool) -> Self { + Self { id, capture } + } + + fn new_from_req(id: TraceId, req: &Request) -> Self { + let should_capture = Self::get_capture_from_header(req); + Self::new(id, should_capture) + } + + fn id(&self) -> TraceId { + self.id + } + + fn should_capture(&self) -> bool { + self.capture + } + + fn get_capture_from_header(req: &Request) -> bool { + match req.headers().get("PRISMA-CAPTURE-LOGS") { + Some(header) => { + if let Ok(capture_logs) = header.to_str() { + capture_logs == "true" + } else { + false + } + } + None => false, + } + } +} + +impl Default for LogCapture { + fn default() -> Self { + Self { + id: TraceId::from_hex("0").unwrap(), + capture: false, + } + } +} + +fn process_gql_req_headers(req: &Request) -> (Option, Span, LogCapture, Option) { + let tx_id = get_transaction_id_from_header(req); + let (span, log_capture) = if tx_id.is_none() { + let cx = get_parent_span_context(req); + let trace_id = get_trace_id_from_context(&cx); + + let span = info_span!("prisma:engine", user_facing = true); + span.set_parent(cx); + + let log_capture = LogCapture::new_from_req(trace_id, req); + + (span, log_capture) + } else { + (Span::none(), LogCapture::default()) + }; + + let traceparent = match req.headers().get("traceparent") { + Some(traceparent) => { + let s = traceparent.to_str().unwrap_or_default().to_string(); + Some(s) + } + _ => None, + }; + + (tx_id, span, log_capture, traceparent) +} diff --git a/query-engine/query-engine/src/state.rs b/query-engine/query-engine/src/state.rs new file mode 100644 index 000000000000..a6b0eae5100d --- /dev/null +++ b/query-engine/query-engine/src/state.rs @@ -0,0 +1,97 @@ +use crate::{context::PrismaContext, logger::Logger, opt::PrismaOpt, PrismaResult}; +use psl::PreviewFeature; +use query_core::schema::QuerySchemaRef; +use query_engine_metrics::{setup as metric_setup, MetricRegistry}; +use std::sync::Arc; +use tracing::Instrument; + +//// Shared application state. +pub struct State { + pub cx: Arc, + pub enable_playground: bool, + pub enable_debug_mode: bool, + pub enable_metrics: bool, +} + +impl State { + /// Create a new instance of `State`. + fn new(cx: PrismaContext, enable_playground: bool, enable_debug_mode: bool, enable_metrics: bool) -> Self { + Self { + cx: Arc::new(cx), + enable_playground, + enable_debug_mode, + enable_metrics, + } + } + + pub fn get_metrics(&self) -> MetricRegistry { + self.cx.metrics.clone() + } + + pub fn query_schema(&self) -> &QuerySchemaRef { + self.cx.query_schema() + } +} + +impl Clone for State { + fn clone(&self) -> Self { + Self { + cx: self.cx.clone(), + enable_playground: self.enable_playground, + enable_debug_mode: self.enable_debug_mode, + enable_metrics: self.enable_metrics, + } + } +} + +pub async fn init_state( + opts: &PrismaOpt, + install_logger: bool, + metrics: Option, +) -> PrismaResult { + let metrics = if metrics.is_none() { + MetricRegistry::new() + } else { + metrics.unwrap() + }; + + let mut logger = Logger::new("prisma-engine-http"); + logger.log_format(opts.log_format()); + logger.log_queries(opts.log_queries()); + logger.enable_telemetry(opts.enable_open_telemetry); + logger.telemetry_endpoint(&opts.open_telemetry_endpoint); + logger.enable_metrics(metrics.clone()); + + let logs_capture = if opts.enable_logs_in_response { + let exporter = logger.enable_logs_capture(); + Some(exporter) + } else { + None + }; + + if install_logger { + logger.install().unwrap(); + } + + if opts.enable_metrics || opts.dataproxy_metric_override { + metric_setup(); + } + + let datamodel = opts.schema(false)?; + let config = &datamodel.configuration; + config.validate_that_one_datasource_is_provided()?; + + let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics) || opts.dataproxy_metric_override; + let span = tracing::info_span!("prisma:engine:connect"); + + let cx = PrismaContext::builder(datamodel) // opts.enable_raw_queries, metrics, logs_capture) + .set_metrics(metrics) + .set_logs_capture_tracer(logs_capture) + .enable_raw_queries(opts.enable_raw_queries) + .build() + .instrument(span) + .await?; + + let state = State::new(cx, opts.enable_playground, opts.enable_debug_mode, enable_metrics); + Ok(state) +} diff --git a/query-engine/query-engine/src/tests/dmmf.rs b/query-engine/query-engine/src/tests/dmmf.rs index 2070bf2e24da..c48a16178594 100644 --- a/query-engine/query-engine/src/tests/dmmf.rs +++ b/query-engine/query-engine/src/tests/dmmf.rs @@ -94,6 +94,7 @@ fn test_dmmf_cli_command(schema: &str) -> PrismaResult<()> { subcommand: Some(Subcommand::Cli(CliOpt::Dmmf)), enable_open_telemetry: false, open_telemetry_endpoint: String::new(), + enable_logs_in_response: false, dataproxy_metric_override: false, };