Skip to content

Commit

Permalink
feat(qe): Capture query logs and return with response
Browse files Browse the repository at this point in the history
The Query engine server can use a new capture tracer that can
capture the query logs for a Prisma Operation and return them
as part of the response
  • Loading branch information
garrensmith committed Aug 10, 2022
1 parent 2920a97 commit 887db40
Show file tree
Hide file tree
Showing 11 changed files with 348 additions and 131 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ validate:
cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma

qe:
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-logs-in-response

qe-dmmf:
cargo run --bin query-engine -- cli dmmf > dmmf.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::{ConnectorTag, RunnerInterface, TestError, TestResult, TxResult};
use hyper::{Body, Method, Request, Response};
use query_core::{MetricRegistry, TxId};
use query_engine::opt::PrismaOpt;
use query_engine::server::{routes, setup, State};
use query_engine::server::routes;
use query_engine::state::{init_state, State};
use request_handlers::{GQLBatchResponse, GQLError, GQLResponse, GraphQlBody, MultiQuery, PrismaResponse};

pub struct BinaryRunner {
Expand All @@ -15,7 +16,7 @@ pub struct BinaryRunner {
impl RunnerInterface for BinaryRunner {
async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult<Self> {
let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]);
let state = setup(&opts, metrics).await.unwrap();
let state = init_state(&opts).await.unwrap();

Ok(BinaryRunner {
state,
Expand Down
8 changes: 7 additions & 1 deletion query-engine/core/src/trace_helpers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use once_cell::sync::Lazy;
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::{TraceContextExt, TraceId};
use opentelemetry::Context;
use serde_json::{json, Value};
use std::borrow::Cow;

Expand Down Expand Up @@ -91,6 +92,11 @@ pub fn set_span_link_from_trace_id(span: &Span, trace_id: Option<String>) {
}
}

pub fn get_trace_id_from_context(context: &Context) -> TraceId {
let context_span = context.span();
context_span.span_context().trace_id()
}

pub fn is_user_facing_trace_filter(meta: &Metadata) -> bool {
if !meta.is_span() {
return false;
Expand Down
108 changes: 108 additions & 0 deletions query-engine/query-engine/src/capture_tracer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use async_trait::async_trait;
use opentelemetry::{
global,
sdk::{self},
sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
propagation::TraceContextPropagator,
},
trace::TracerProvider,
};
use query_core::spans_to_json;
use std::fmt::Debug;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;

/// Pipeline builder
#[derive(Debug)]
pub struct PipelineBuilder {
trace_config: Option<sdk::trace::Config>,
}

/// Create a new stdout exporter pipeline builder.
pub fn new_pipeline() -> PipelineBuilder {
PipelineBuilder::default()
}

impl Default for PipelineBuilder {
/// Return the default pipeline builder.
fn default() -> Self {
Self { trace_config: None }
}
}

impl PipelineBuilder {
/// Assign the SDK trace configuration.
pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self {
self.trace_config = Some(config);
self
}
}

impl PipelineBuilder {
pub fn install_simple(mut self, exporter: CaptureExporter) -> sdk::trace::Tracer {
global::set_text_map_propagator(TraceContextPropagator::new());

let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter);
if let Some(config) = self.trace_config.take() {
provider_builder = provider_builder.with_config(config);
}
let provider = provider_builder.build();
let tracer = provider.tracer("opentelemetry");
global::set_tracer_provider(provider);

tracer
}
}

/// A [`CaptureExporter`] that sends spans to stdout.
#[derive(Debug, Clone)]
pub struct CaptureExporter {
logs: Arc<Mutex<HashMap<String, Vec<SpanData>>>>,
}

impl CaptureExporter {
pub fn new() -> Self {
Self {
logs: Default::default(),
}
}

pub async fn capture(&self, trace_id: String) {
let mut logs = self.logs.lock().await;
logs.insert(trace_id, Vec::new());
}

pub async fn get(&self, trace_id: String) -> String {
let mut logs = self.logs.lock().await;
if let Some(spans) = logs.remove(&trace_id) {
spans_to_json(&spans)
} else {
String::new()
}
}
}

impl Default for CaptureExporter {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl SpanExporter for CaptureExporter {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
let batch = batch.into_iter().filter(|span| span.name == "quaint:query");

let mut logs = self.logs.lock().await;
for span in batch {
let trace_id = span.span_context.trace_id().to_string();

if let Some(spans) = logs.get_mut(&trace_id) {
spans.push(span)
}
}

Ok(())
}
}
22 changes: 19 additions & 3 deletions query-engine/query-engine/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::{PrismaError, PrismaResult};
use crate::{capture_tracer::CaptureExporter, PrismaError, PrismaResult};
use datamodel::{dml::Datamodel, Configuration};
use prisma_models::InternalDataModelBuilder;
use query_core::{executor, schema::QuerySchemaRef, schema_builder, MetricRegistry, QueryExecutor};
use std::{env, fmt, sync::Arc};
use std::{
env, fmt,
sync::{atomic::AtomicUsize, Arc},
};

/// Prisma request context containing all immutable state of the process.
/// There is usually only one context initialized per process.
Expand All @@ -14,6 +17,8 @@ pub struct PrismaContext {
pub metrics: MetricRegistry,
/// Central query executor.
pub executor: Box<dyn QueryExecutor + Send + Sync + 'static>,
pub inflight_tracer: Option<CaptureExporter>,
pub counter: AtomicUsize,
}

impl fmt::Debug for PrismaContext {
Expand All @@ -27,6 +32,7 @@ pub struct ContextBuilder {
datamodel: Datamodel,
config: Configuration,
metrics: Option<MetricRegistry>,
logs_capture_tracer: Option<CaptureExporter>,
}

impl ContextBuilder {
Expand All @@ -40,24 +46,31 @@ impl ContextBuilder {
self
}

pub fn set_logs_capture_tracer(mut self, tracer: Option<CaptureExporter>) -> Self {
self.logs_capture_tracer = tracer;
self
}

pub async fn build(self) -> PrismaResult<PrismaContext> {
PrismaContext::new(
self.config,
self.datamodel,
self.enable_raw_queries,
self.metrics.unwrap(),
self.logs_capture_tracer,
)
.await
}
}

impl PrismaContext {
/// Initializes a new Prisma context.
async fn new(
pub async fn new(
config: Configuration,
dm: Datamodel,
enable_raw_queries: bool,
metrics: MetricRegistry,
inflight_tracer: Option<CaptureExporter>,
) -> PrismaResult<Self> {
// We only support one data source at the moment, so take the first one (default not exposed yet).
let data_source = config
Expand Down Expand Up @@ -88,6 +101,8 @@ impl PrismaContext {
dm,
executor,
metrics,
inflight_tracer,
counter: AtomicUsize::new(0),
};

context.verify_connection().await?;
Expand All @@ -106,6 +121,7 @@ impl PrismaContext {
datamodel,
config,
metrics: None,
logs_capture_tracer: None,
}
}

Expand Down
2 changes: 2 additions & 0 deletions query-engine/query-engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub mod capture_tracer;
pub mod cli;
pub mod context;
pub mod error;
pub mod logger;
pub mod opt;
pub mod server;
pub mod state;
pub mod tracer;

use error::PrismaError;
Expand Down
22 changes: 16 additions & 6 deletions query-engine/query-engine/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use opentelemetry::{
global,
sdk::{
propagation::TraceContextPropagator,
trace::{Config, Tracer},
Resource,
},
Expand All @@ -12,7 +10,7 @@ use query_core::{is_user_facing_trace_filter, MetricRegistry};
use tracing::{dispatcher::SetGlobalDefaultError, subscriber};
use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, EnvFilter, Layer};

use crate::LogFormat;
use crate::{capture_tracer::CaptureExporter, LogFormat};

type LoggerResult<T> = Result<T, SetGlobalDefaultError>;

Expand All @@ -25,6 +23,7 @@ pub struct Logger<'a> {
log_queries: bool,
telemetry_endpoint: Option<&'a str>,
metrics: Option<MetricRegistry>,
log_capture_exporter: Option<CaptureExporter>,
}

impl<'a> Logger<'a> {
Expand All @@ -37,6 +36,7 @@ impl<'a> Logger<'a> {
log_queries: false,
telemetry_endpoint: None,
metrics: None,
log_capture_exporter: None,
}
}

Expand Down Expand Up @@ -68,6 +68,12 @@ impl<'a> Logger<'a> {
self.metrics = Some(metrics);
}

pub fn enable_logs_capture(&mut self) -> CaptureExporter {
let capture = CaptureExporter::new();
self.log_capture_exporter = Some(capture.clone());
capture
}

/// Install logger as a global. Can be called only once per application
/// instance. The returned guard value needs to stay in scope for the whole
/// lifetime of the service.
Expand All @@ -78,7 +84,13 @@ impl<'a> Logger<'a> {

let telemetry = if self.enable_telemetry {
let tracer = create_otel_tracer(self.service_name, self.telemetry_endpoint);
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let mut telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

if self.log_capture_exporter.is_some() {
let tracer = crate::capture_tracer::new_pipeline().install_simple(self.log_capture_exporter.unwrap());
telemetry = telemetry.with_tracer(tracer);
}

let telemetry = telemetry.with_filter(is_user_trace);
Some(telemetry)
} else {
Expand Down Expand Up @@ -108,8 +120,6 @@ impl<'a> Logger<'a> {
}

fn create_otel_tracer(service_name: &'static str, collector_endpoint: Option<&str>) -> Tracer {
global::set_text_map_propagator(TraceContextPropagator::new());

if let Some(endpoint) = collector_endpoint {
// A special parameter for Jaeger to set the service name in spans.
let resource = Resource::new(vec![KeyValue::new("service.name", service_name)]);
Expand Down
23 changes: 5 additions & 18 deletions query-engine/query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
#[macro_use]
extern crate tracing;

use query_core::{metrics, MetricRegistry};
use query_engine::cli::CliCommand;
use query_engine::error::PrismaError;
use query_engine::logger::Logger;
use query_engine::opt::PrismaOpt;
use query_engine::server;
use query_engine::state::init_state;
use query_engine::LogFormat;
use std::{error::Error, process};
use structopt::StructOpt;
use tracing::Instrument;

type AnyError = Box<dyn Error + Send + Sync + 'static>;

Expand All @@ -26,26 +26,13 @@ async fn main() -> Result<(), AnyError> {
async fn main() -> Result<(), PrismaError> {
let opts = PrismaOpt::from_args();

let metrics = MetricRegistry::new();

let mut logger = Logger::new("prisma-engine-http");
logger.log_format(opts.log_format());
logger.log_queries(opts.log_queries());
logger.enable_telemetry(opts.enable_open_telemetry);
logger.telemetry_endpoint(&opts.open_telemetry_endpoint);
logger.enable_metrics(metrics.clone());

logger.install().unwrap();

if opts.enable_metrics {
metrics::setup();
}

match CliCommand::from_opt(&opts)? {
Some(cmd) => cmd.execute().await?,
None => {
let span = tracing::info_span!("prisma:engine:connect");
let state = init_state(&opts).instrument(span).await?;
set_panic_hook(opts.log_format());
server::listen(opts, metrics).await?;
server::listen(&opts, state).await?;
}
}

Expand Down
4 changes: 4 additions & 0 deletions query-engine/query-engine/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ pub struct PrismaOpt {
#[structopt(long)]
pub enable_open_telemetry: bool,

#[structopt(long)]
/// Enable tracer to capture logs and return in the response
pub enable_logs_in_response: bool,

/// The url to the OpenTelemetry collector.
/// Enabling this will send the OpenTelemtry tracing to a collector
/// and not via our custom stdout tracer
Expand Down
Loading

0 comments on commit 887db40

Please sign in to comment.