diff --git a/Makefile b/Makefile index 2a073e05ec2f..9dbf1c0c86d2 100644 --- a/Makefile +++ b/Makefile @@ -195,7 +195,7 @@ validate: cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma qe: - cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry + cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-logs-in-response qe-dmmf: cargo run --bin query-engine -- cli dmmf > dmmf.json diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs index 278351d73611..26a60a45ea18 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/binary.rs @@ -2,7 +2,8 @@ use crate::{ConnectorTag, RunnerInterface, TestError, TestResult, TxResult}; use hyper::{Body, Method, Request, Response}; use query_core::{MetricRegistry, TxId}; use query_engine::opt::PrismaOpt; -use query_engine::server::{routes, setup, State}; +use query_engine::server::routes; +use query_engine::state::{init_state, State}; use request_handlers::{GQLBatchResponse, GQLError, GQLResponse, GraphQlBody, MultiQuery, PrismaResponse}; pub struct BinaryRunner { @@ -15,7 +16,7 @@ pub struct BinaryRunner { impl RunnerInterface for BinaryRunner { async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult { let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]); - let state = setup(&opts, metrics).await.unwrap(); + let state = init_state(&opts, false, Some(metrics)).await.unwrap(); Ok(BinaryRunner { state, diff --git a/query-engine/core/src/trace_helpers/mod.rs b/query-engine/core/src/trace_helpers/mod.rs index 90672cb1c915..bdf258f6477c 100644 --- a/query-engine/core/src/trace_helpers/mod.rs +++ b/query-engine/core/src/trace_helpers/mod.rs @@ -1,6 +1,7 @@ use once_cell::sync::Lazy; use opentelemetry::sdk::export::trace::SpanData; -use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::{TraceContextExt, TraceId}; +use opentelemetry::Context; use serde_json::{json, Value}; use std::borrow::Cow; @@ -91,6 +92,11 @@ pub fn set_span_link_from_trace_id(span: &Span, trace_id: Option) { } } +pub fn get_trace_id_from_context(context: &Context) -> TraceId { + let context_span = context.span(); + context_span.span_context().trace_id() +} + pub fn is_user_facing_trace_filter(meta: &Metadata) -> bool { if !meta.is_span() { return false; diff --git a/query-engine/query-engine/src/capture_tracer.rs b/query-engine/query-engine/src/capture_tracer.rs new file mode 100644 index 000000000000..d91b6c61a1fb --- /dev/null +++ b/query-engine/query-engine/src/capture_tracer.rs @@ -0,0 +1,108 @@ +use async_trait::async_trait; +use opentelemetry::{ + global, + sdk::{self}, + sdk::{ + export::trace::{ExportResult, SpanData, SpanExporter}, + propagation::TraceContextPropagator, + }, + trace::TracerProvider, +}; +use query_core::spans_to_json; +use std::fmt::Debug; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; + +/// Pipeline builder +#[derive(Debug)] +pub struct PipelineBuilder { + trace_config: Option, +} + +/// Create a new stdout exporter pipeline builder. +pub fn new_pipeline() -> PipelineBuilder { + PipelineBuilder::default() +} + +impl Default for PipelineBuilder { + /// Return the default pipeline builder. + fn default() -> Self { + Self { trace_config: None } + } +} + +impl PipelineBuilder { + /// Assign the SDK trace configuration. + pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { + self.trace_config = Some(config); + self + } +} + +impl PipelineBuilder { + pub fn install_simple(mut self, exporter: CaptureExporter) -> sdk::trace::Tracer { + global::set_text_map_propagator(TraceContextPropagator::new()); + + let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter); + if let Some(config) = self.trace_config.take() { + provider_builder = provider_builder.with_config(config); + } + let provider = provider_builder.build(); + let tracer = provider.tracer("opentelemetry"); + global::set_tracer_provider(provider); + + tracer + } +} + +/// A [`CaptureExporter`] that sends spans to stdout. +#[derive(Debug, Clone)] +pub struct CaptureExporter { + logs: Arc>>>, +} + +impl CaptureExporter { + pub fn new() -> Self { + Self { + logs: Default::default(), + } + } + + pub async fn capture(&self, trace_id: String) { + let mut logs = self.logs.lock().await; + logs.insert(trace_id, Vec::new()); + } + + pub async fn get(&self, trace_id: String) -> String { + let mut logs = self.logs.lock().await; + if let Some(spans) = logs.remove(&trace_id) { + spans_to_json(&spans) + } else { + String::new() + } + } +} + +impl Default for CaptureExporter { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl SpanExporter for CaptureExporter { + async fn export(&mut self, batch: Vec) -> ExportResult { + let batch = batch.into_iter().filter(|span| span.name == "quaint:query"); + + let mut logs = self.logs.lock().await; + for span in batch { + let trace_id = span.span_context.trace_id().to_string(); + + if let Some(spans) = logs.get_mut(&trace_id) { + spans.push(span) + } + } + + Ok(()) + } +} diff --git a/query-engine/query-engine/src/context.rs b/query-engine/query-engine/src/context.rs index b1ec59d9f8eb..39ecbcc7cb1b 100644 --- a/query-engine/query-engine/src/context.rs +++ b/query-engine/query-engine/src/context.rs @@ -1,8 +1,11 @@ -use crate::{PrismaError, PrismaResult}; +use crate::{capture_tracer::CaptureExporter, PrismaError, PrismaResult}; use datamodel::{dml::Datamodel, Configuration}; use prisma_models::InternalDataModelBuilder; use query_core::{executor, schema::QuerySchemaRef, schema_builder, MetricRegistry, QueryExecutor}; -use std::{env, fmt, sync::Arc}; +use std::{ + env, fmt, + sync::{atomic::AtomicUsize, Arc}, +}; /// Prisma request context containing all immutable state of the process. /// There is usually only one context initialized per process. @@ -14,6 +17,8 @@ pub struct PrismaContext { pub metrics: MetricRegistry, /// Central query executor. pub executor: Box, + pub inflight_tracer: Option, + pub counter: AtomicUsize, } impl fmt::Debug for PrismaContext { @@ -27,6 +32,7 @@ pub struct ContextBuilder { datamodel: Datamodel, config: Configuration, metrics: Option, + logs_capture_tracer: Option, } impl ContextBuilder { @@ -40,12 +46,18 @@ impl ContextBuilder { self } + pub fn set_logs_capture_tracer(mut self, tracer: Option) -> Self { + self.logs_capture_tracer = tracer; + self + } + pub async fn build(self) -> PrismaResult { PrismaContext::new( self.config, self.datamodel, self.enable_raw_queries, self.metrics.unwrap(), + self.logs_capture_tracer, ) .await } @@ -53,11 +65,12 @@ impl ContextBuilder { impl PrismaContext { /// Initializes a new Prisma context. - async fn new( + pub async fn new( config: Configuration, dm: Datamodel, enable_raw_queries: bool, metrics: MetricRegistry, + inflight_tracer: Option, ) -> PrismaResult { // We only support one data source at the moment, so take the first one (default not exposed yet). let data_source = config @@ -88,6 +101,8 @@ impl PrismaContext { dm, executor, metrics, + inflight_tracer, + counter: AtomicUsize::new(0), }; context.verify_connection().await?; @@ -106,6 +121,7 @@ impl PrismaContext { datamodel, config, metrics: None, + logs_capture_tracer: None, } } diff --git a/query-engine/query-engine/src/lib.rs b/query-engine/query-engine/src/lib.rs index 8ca2fa8b9f74..8a73bad59f93 100644 --- a/query-engine/query-engine/src/lib.rs +++ b/query-engine/query-engine/src/lib.rs @@ -1,9 +1,11 @@ +pub mod capture_tracer; pub mod cli; pub mod context; pub mod error; pub mod logger; pub mod opt; pub mod server; +pub mod state; pub mod tracer; use error::PrismaError; diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 25bd48250c36..b70cc33fc0c3 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -1,7 +1,5 @@ use opentelemetry::{ - global, sdk::{ - propagation::TraceContextPropagator, trace::{Config, Tracer}, Resource, }, @@ -12,7 +10,7 @@ use query_core::{is_user_facing_trace_filter, MetricRegistry}; use tracing::{dispatcher::SetGlobalDefaultError, subscriber}; use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, EnvFilter, Layer}; -use crate::LogFormat; +use crate::{capture_tracer::CaptureExporter, LogFormat}; type LoggerResult = Result; @@ -25,6 +23,7 @@ pub struct Logger<'a> { log_queries: bool, telemetry_endpoint: Option<&'a str>, metrics: Option, + log_capture_exporter: Option, } impl<'a> Logger<'a> { @@ -37,6 +36,7 @@ impl<'a> Logger<'a> { log_queries: false, telemetry_endpoint: None, metrics: None, + log_capture_exporter: None, } } @@ -68,6 +68,12 @@ impl<'a> Logger<'a> { self.metrics = Some(metrics); } + pub fn enable_logs_capture(&mut self) -> CaptureExporter { + let capture = CaptureExporter::new(); + self.log_capture_exporter = Some(capture.clone()); + capture + } + /// Install logger as a global. Can be called only once per application /// instance. The returned guard value needs to stay in scope for the whole /// lifetime of the service. @@ -78,7 +84,13 @@ impl<'a> Logger<'a> { let telemetry = if self.enable_telemetry { let tracer = create_otel_tracer(self.service_name, self.telemetry_endpoint); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let mut telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + if self.log_capture_exporter.is_some() { + let tracer = crate::capture_tracer::new_pipeline().install_simple(self.log_capture_exporter.unwrap()); + telemetry = telemetry.with_tracer(tracer); + } + let telemetry = telemetry.with_filter(is_user_trace); Some(telemetry) } else { @@ -108,8 +120,6 @@ impl<'a> Logger<'a> { } fn create_otel_tracer(service_name: &'static str, collector_endpoint: Option<&str>) -> Tracer { - global::set_text_map_propagator(TraceContextPropagator::new()); - if let Some(endpoint) = collector_endpoint { // A special parameter for Jaeger to set the service name in spans. let resource = Resource::new(vec![KeyValue::new("service.name", service_name)]); diff --git a/query-engine/query-engine/src/main.rs b/query-engine/query-engine/src/main.rs index 496045c02cc0..f7f8df7ca7d6 100644 --- a/query-engine/query-engine/src/main.rs +++ b/query-engine/query-engine/src/main.rs @@ -3,15 +3,15 @@ #[macro_use] extern crate tracing; -use query_core::{metrics, MetricRegistry}; use query_engine::cli::CliCommand; use query_engine::error::PrismaError; -use query_engine::logger::Logger; use query_engine::opt::PrismaOpt; use query_engine::server; +use query_engine::state::init_state; use query_engine::LogFormat; use std::{error::Error, process}; use structopt::StructOpt; +use tracing::Instrument; type AnyError = Box; @@ -26,26 +26,13 @@ async fn main() -> Result<(), AnyError> { async fn main() -> Result<(), PrismaError> { let opts = PrismaOpt::from_args(); - let metrics = MetricRegistry::new(); - - let mut logger = Logger::new("prisma-engine-http"); - logger.log_format(opts.log_format()); - logger.log_queries(opts.log_queries()); - logger.enable_telemetry(opts.enable_open_telemetry); - logger.telemetry_endpoint(&opts.open_telemetry_endpoint); - logger.enable_metrics(metrics.clone()); - - logger.install().unwrap(); - - if opts.enable_metrics { - metrics::setup(); - } - match CliCommand::from_opt(&opts)? { Some(cmd) => cmd.execute().await?, None => { + let span = tracing::info_span!("prisma:engine:connect"); + let state = init_state(&opts, true, None).instrument(span).await?; set_panic_hook(opts.log_format()); - server::listen(opts, metrics).await?; + server::listen(&opts, state).await?; } } diff --git a/query-engine/query-engine/src/opt.rs b/query-engine/query-engine/src/opt.rs index 3048dc060a19..edd74aeda9af 100644 --- a/query-engine/query-engine/src/opt.rs +++ b/query-engine/query-engine/src/opt.rs @@ -101,6 +101,10 @@ pub struct PrismaOpt { #[structopt(long)] pub enable_open_telemetry: bool, + #[structopt(long)] + /// Enable tracer to capture logs and return in the response + pub enable_logs_in_response: bool, + /// The url to the OpenTelemetry collector. /// Enabling this will send the OpenTelemtry tracing to a collector /// and not via our custom stdout tracer diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index c1c479224d07..4e2a5e355343 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -1,10 +1,9 @@ -use crate::{context::PrismaContext, opt::PrismaOpt, PrismaResult}; -use datamodel::common::preview_features::PreviewFeature; +use crate::{opt::PrismaOpt, state::State, PrismaResult}; use hyper::service::{make_service_fn, service_fn}; use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode}; use opentelemetry::{global, propagation::Extractor, Context}; +use query_core::{get_trace_id_from_context, MetricFormat}; use query_core::{schema::QuerySchemaRenderer, TxId}; -use query_core::{MetricFormat, MetricRegistry}; use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler, TxInput}; use serde_json::json; use std::collections::HashMap; @@ -16,84 +15,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; const TRANSACTION_ID_HEADER: &str = "X-transaction-id"; -//// Shared application state. -pub struct State { - cx: Arc, - enable_playground: bool, - enable_debug_mode: bool, - enable_itx: bool, - enable_metrics: bool, -} - -impl State { - /// Create a new instance of `State`. - fn new( - cx: PrismaContext, - enable_playground: bool, - enable_debug_mode: bool, - enable_itx: bool, - enable_metrics: bool, - ) -> Self { - Self { - cx: Arc::new(cx), - enable_playground, - enable_debug_mode, - enable_itx, - enable_metrics, - } - } - - pub fn get_metrics(&self) -> MetricRegistry { - self.cx.metrics.clone() - } -} - -impl Clone for State { - fn clone(&self) -> Self { - Self { - cx: self.cx.clone(), - enable_playground: self.enable_playground, - enable_debug_mode: self.enable_debug_mode, - enable_itx: self.enable_itx, - enable_metrics: self.enable_metrics, - } - } -} - -pub async fn setup(opts: &PrismaOpt, metrics: MetricRegistry) -> PrismaResult { - let config = opts.configuration(false)?.subject; - config.validate_that_one_datasource_is_provided()?; - - let span = tracing::info_span!("prisma:engine:connect"); - - let enable_itx = config - .preview_features() - .contains(PreviewFeature::InteractiveTransactions); - - let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics); - - let datamodel = opts.datamodel()?; - let cx = PrismaContext::builder(config, datamodel) - .set_metrics(metrics) - .enable_raw_queries(opts.enable_raw_queries) - .build() - .instrument(span) - .await?; - - let state = State::new( - cx, - opts.enable_playground, - opts.enable_debug_mode, - enable_itx, - enable_metrics, - ); - Ok(state) -} - /// Starts up the graphql query engine server -pub async fn listen(opts: PrismaOpt, metrics: MetricRegistry) -> PrismaResult<()> { - let state = setup(&opts, metrics).await?; - +pub async fn listen(opts: &PrismaOpt, state: State) -> PrismaResult<()> { let query_engine = make_service_fn(move |_| { let state = state.clone(); async move { Ok::<_, hyper::Error>(service_fn(move |req| routes(state.clone(), req))) } @@ -189,24 +112,17 @@ async fn graphql_handler(state: State, req: Request) -> Result { - let s = traceparent.to_str().unwrap_or_default().to_string(); - Some(s) - } - _ => None, - }; + if log_capture.should_capture() { + state + .cx + .inflight_tracer + .as_ref() + .unwrap() + .capture(log_capture.id()) + .await; + } let work = async move { let body_start = req.into_body(); @@ -216,9 +132,18 @@ async fn graphql_handler(state: State, req: Request) -> Result { let handler = GraphQlHandler::new(&*state.cx.executor, state.cx.query_schema()); - let result = handler.handle(body, tx_id, trace_id).instrument(span).await; - - let result_bytes = serde_json::to_vec(&result).unwrap(); + let result = handler.handle(body, tx_id, trace_id.clone()).instrument(span).await; + + let result_bytes = if log_capture.should_capture() { + let logs = state.cx.inflight_tracer.as_ref().unwrap().get(log_capture.id()).await; + let json = json!({ + "result": result, + "logs": logs + }); + serde_json::to_vec(&json).unwrap() + } else { + serde_json::to_vec(&result).unwrap() + }; let res = Response::builder() .status(StatusCode::OK) @@ -460,3 +385,67 @@ fn err_to_http_resp(err: query_core::CoreError) -> Response { Response::builder().status(status).body(body).unwrap() } + +struct LogCapture { + id: String, + capture: bool, +} + +impl LogCapture { + fn new(id: String, capture: bool) -> Self { + Self { id, capture } + } + + fn new_from_req(id: String, req: &Request) -> Self { + let should_capture = Self::get_capture_from_header(req); + Self::new(id, should_capture) + } + + fn id(&self) -> String { + self.id.to_string() + } + + fn should_capture(&self) -> bool { + self.capture + } + + fn get_capture_from_header(req: &Request) -> bool { + match req.headers().get("CAPTURE_LOGS") { + Some(header) => { + if let Ok(capture_logs) = header.to_str() { + capture_logs == "true" + } else { + false + } + } + None => false, + } + } +} + +fn process_gql_req_headers(req: &Request) -> (Option, Span, LogCapture, Option) { + let tx_id = get_transaction_id_from_header(req); + let (span, log_capture) = if tx_id.is_none() { + let cx = get_parent_span_context(req); + let trace_id = get_trace_id_from_context(&cx); + + let span = info_span!("prisma:engine", user_facing = true); + span.set_parent(cx); + + let log_capture = LogCapture::new_from_req(trace_id.to_string(), req); + + (span, log_capture) + } else { + (Span::none(), LogCapture::new(Default::default(), false)) + }; + + let traceparent = match req.headers().get("traceparent") { + Some(traceparent) => { + let s = traceparent.to_str().unwrap_or_default().to_string(); + Some(s) + } + _ => None, + }; + + (tx_id, span, log_capture, traceparent) +} diff --git a/query-engine/query-engine/src/state.rs b/query-engine/query-engine/src/state.rs new file mode 100644 index 000000000000..e79e46857732 --- /dev/null +++ b/query-engine/query-engine/src/state.rs @@ -0,0 +1,104 @@ +use crate::{context::PrismaContext, logger::Logger, opt::PrismaOpt, PrismaResult}; +use datamodel::common::preview_features::PreviewFeature; +use query_core::{metrics, MetricRegistry}; + +use std::sync::Arc; + +//// Shared application state. +pub struct State { + pub cx: Arc, + pub enable_playground: bool, + pub enable_debug_mode: bool, + pub enable_itx: bool, + pub enable_metrics: bool, +} + +impl State { + /// Create a new instance of `State`. + fn new( + cx: PrismaContext, + enable_playground: bool, + enable_debug_mode: bool, + enable_itx: bool, + enable_metrics: bool, + ) -> Self { + Self { + cx: Arc::new(cx), + enable_playground, + enable_debug_mode, + enable_itx, + enable_metrics, + } + } + + pub fn get_metrics(&self) -> MetricRegistry { + self.cx.metrics.clone() + } +} + +impl Clone for State { + fn clone(&self) -> Self { + Self { + cx: self.cx.clone(), + enable_playground: self.enable_playground, + enable_debug_mode: self.enable_debug_mode, + enable_itx: self.enable_itx, + enable_metrics: self.enable_metrics, + } + } +} + +pub async fn init_state( + opts: &PrismaOpt, + install_logger: bool, + metrics: Option, +) -> PrismaResult { + let metrics = if metrics.is_none() { + MetricRegistry::new() + } else { + metrics.unwrap() + }; + + let mut logger = Logger::new("prisma-engine-http"); + logger.log_format(opts.log_format()); + logger.log_queries(opts.log_queries()); + logger.enable_telemetry(opts.enable_open_telemetry); + logger.telemetry_endpoint(&opts.open_telemetry_endpoint); + logger.enable_metrics(metrics.clone()); + let logs_capture = if opts.enable_logs_in_response { + let exporter = logger.enable_logs_capture(); + Some(exporter) + } else { + None + }; + + if install_logger { + logger.install().unwrap(); + } + + if opts.enable_metrics { + metrics::setup(); + } + + let config = opts.configuration(false)?.subject; + config.validate_that_one_datasource_is_provided()?; + + let enable_itx = config + .preview_features() + .contains(PreviewFeature::InteractiveTransactions); + + let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics); + + let datamodel = opts.datamodel()?; + + let cx = PrismaContext::new(config, datamodel, opts.enable_raw_queries, metrics, logs_capture).await?; + + let state = State::new( + cx, + opts.enable_playground, + opts.enable_debug_mode, + enable_itx, + enable_metrics, + ); + Ok(state) +} diff --git a/query-engine/query-engine/src/tests/dmmf.rs b/query-engine/query-engine/src/tests/dmmf.rs index f537757d3e17..f15f1cda9d31 100644 --- a/query-engine/query-engine/src/tests/dmmf.rs +++ b/query-engine/query-engine/src/tests/dmmf.rs @@ -112,6 +112,7 @@ fn test_dmmf_cli_command(schema: &str) -> PrismaResult<()> { subcommand: Some(Subcommand::Cli(CliOpt::Dmmf)), enable_open_telemetry: false, open_telemetry_endpoint: String::new(), + enable_logs_in_response: false, }; let cli_cmd = CliCommand::from_opt(&prisma_opt)?.unwrap();