From 012a890dccf7d65ed9750892bb7dce3652ae9631 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Mon, 19 Dec 2022 21:56:19 +0100 Subject: [PATCH] WIP: implement capturer as a layer rather than a SpanExporter Expecting a lot of tests to fail --- query-engine/core/src/trace_helpers/mod.rs | 6 +- .../query-engine/src/capture_exporter.rs | 196 ------------------ query-engine/query-engine/src/context.rs | 20 +- query-engine/query-engine/src/lib.rs | 3 +- query-engine/query-engine/src/logger.rs | 17 +- query-engine/query-engine/src/server/mod.rs | 42 ++-- query-engine/query-engine/src/state.rs | 4 +- .../src/telemetry_capturing/config.rs | 96 +++++++++ .../src/telemetry_capturing/layer.rs | 24 +++ .../src/telemetry_capturing/mod.rs | 5 + 10 files changed, 170 insertions(+), 243 deletions(-) delete mode 100644 query-engine/query-engine/src/capture_exporter.rs create mode 100644 query-engine/query-engine/src/telemetry_capturing/config.rs create mode 100644 query-engine/query-engine/src/telemetry_capturing/layer.rs create mode 100644 query-engine/query-engine/src/telemetry_capturing/mod.rs diff --git a/query-engine/core/src/trace_helpers/mod.rs b/query-engine/core/src/trace_helpers/mod.rs index 1b527ab32936..76e8db801f1b 100644 --- a/query-engine/core/src/trace_helpers/mod.rs +++ b/query-engine/core/src/trace_helpers/mod.rs @@ -36,13 +36,13 @@ fn span_to_json(span: &SpanData) -> Value { } #[derive(Serialize, Debug, Clone)] -pub struct UserFacingSpan { +struct UserFacingSpan { trace_id: String, span_id: String, parent_span_id: String, name: String, - start_time: [u64; 2], - end_time: [u64; 2], + start_time: HrTime, + end_time: HrTime, #[serde(skip_serializing_if = "HashMap::is_empty")] attributes: HashMap, #[serde(skip_serializing_if = "Vec::is_empty")] diff --git a/query-engine/query-engine/src/capture_exporter.rs b/query-engine/query-engine/src/capture_exporter.rs deleted file mode 100644 index f2dc352e7aed..000000000000 --- a/query-engine/query-engine/src/capture_exporter.rs +++ /dev/null @@ -1,196 +0,0 @@ -use async_trait::async_trait; -use opentelemetry::{ - global, - sdk::{ - self, - export::trace::{ExportResult, SpanData, SpanExporter}, - propagation::TraceContextPropagator, - }, - trace::{TraceId, TracerProvider}, -}; -use query_core::UserFacingSpan; -use std::{collections::HashMap, sync::Arc}; -use std::{fmt::Debug, time::Duration}; -use tokio::sync::Mutex; - -#[derive(Debug, Clone)] -pub enum Config { - Enabled(ConfiguredCapturer), - Disabled, -} - -pub(crate) fn enabled(c: CaptureExporter, trace_id: TraceId) -> Config { - Config::Enabled(ConfiguredCapturer { capturer: c, trace_id }) -} - -pub fn disabled() -> Config { - Config::Disabled -} -/// A ConfiguredCapturer is ready to capture spans for a particular trace and is built from -#[derive(Debug, Clone)] -pub struct ConfiguredCapturer { - capturer: CaptureExporter, - trace_id: TraceId, -} - -impl ConfiguredCapturer { - pub async fn start_capturing(&self) { - self.capturer - .start_capturing(self.trace_id, CaptureTimeout::Default) - .await - } - - pub async fn fetch_captures(&self) -> Vec { - self.capturer.fetch_captures(self.trace_id).await - } -} - -/// Pipeline builder -#[derive(Debug)] -pub struct PipelineBuilder { - trace_config: Option, -} - -/// Create a new in memory expoter -pub fn new_pipeline() -> PipelineBuilder { - PipelineBuilder::default() -} - -impl Default for PipelineBuilder { - /// Return the default pipeline builder. - fn default() -> Self { - Self { trace_config: None } - } -} - -impl PipelineBuilder { - /// Assign the SDK trace configuration. - pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { - self.trace_config = Some(config); - self - } -} - -impl PipelineBuilder { - pub fn install(mut self, exporter: CaptureExporter) -> sdk::trace::Tracer { - global::set_text_map_propagator(TraceContextPropagator::new()); - - let processor = sdk::trace::BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio) - .with_scheduled_delay(Duration::new(0, 1)) - .build(); - let mut provider_builder = sdk::trace::TracerProvider::builder().with_span_processor(processor); - - if let Some(config) = self.trace_config.take() { - provider_builder = provider_builder.with_config(config); - } - let provider = provider_builder.build(); - let tracer = provider.tracer("opentelemetry"); - global::set_tracer_provider(provider); - - tracer - } -} - -/// A [`SpanExporter`] that captures and stores spans in memory in a synchronized dictionary for -/// later retrieval -#[derive(Debug, Clone)] -pub struct CaptureExporter { - pub(crate) traces: Arc>>>, -} - -pub(crate) enum CaptureTimeout { - #[allow(dead_code)] - Duration(Duration), - Default, -} - -impl CaptureExporter { - const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); - - pub fn new() -> Self { - Self { - traces: Default::default(), - } - } - - pub(crate) async fn start_capturing(&self, trace_id: TraceId, timeout: CaptureTimeout) { - let mut locked_traces = self.traces.lock().await; - locked_traces.insert(trace_id, Vec::new()); - drop(locked_traces); - - let when = match timeout { - CaptureTimeout::Duration(d) => d, - CaptureTimeout::Default => Self::DEFAULT_TIMEOUT, - }; - - let traces = self.traces.clone(); - tokio::spawn(async move { - tokio::time::sleep(when).await; - let mut locked_traces = traces.lock().await; - if locked_traces.remove(&trace_id).is_some() { - warn!("Timeout waiting for spans to be captured. trace_id{}", trace_id) - } - }); - } - - pub(crate) async fn fetch_captures(&self, trace_id: TraceId) -> Vec { - let mut traces = self.traces.lock().await; - - let spans = if let Some(spans) = traces.remove(&trace_id) { - spans - } else { - vec![] - }; - drop(traces); - - spans.iter().map(UserFacingSpan::from).collect() - } -} - -impl Default for CaptureExporter { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl SpanExporter for CaptureExporter { - async fn export(&mut self, batch: Vec) -> ExportResult { - let mut traces = self.traces.lock().await; - for span in batch { - let trace_id = span.span_context.trace_id(); - - if let Some(spans) = traces.get_mut(&trace_id) { - spans.push(span) - } - } - - Ok(()) - } -} - -// tests for capture exporter -#[cfg(test)] -mod tests { - use super::*; - use std::time::Duration; - - #[tokio::test] - async fn test_garbage_collection() { - let exporter = CaptureExporter::new(); - - let trace_id = TraceId::from_hex("1").unwrap(); - let one_ms = Duration::from_millis(1); - exporter - .start_capturing(trace_id, CaptureTimeout::Duration(one_ms)) - .await; - let traces = exporter.traces.lock().await; - assert!(traces.get(&trace_id).is_some()); - drop(traces); - - tokio::time::sleep(10 * one_ms).await; - - let traces = exporter.traces.lock().await; - assert!(traces.get(&trace_id).is_none()); - } -} diff --git a/query-engine/query-engine/src/context.rs b/query-engine/query-engine/src/context.rs index 76c80e7acf45..bdce562367f6 100644 --- a/query-engine/query-engine/src/context.rs +++ b/query-engine/query-engine/src/context.rs @@ -1,4 +1,4 @@ -use crate::{capture_exporter::CaptureExporter, PrismaError, PrismaResult}; +use crate::{telemetry_capturing, PrismaError, PrismaResult}; use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor}; use query_engine_metrics::MetricRegistry; use std::{env, fmt, sync::Arc}; @@ -12,8 +12,8 @@ pub struct PrismaContext { pub metrics: MetricRegistry, /// Central query executor. pub executor: Box, - // The trace capturer being in flight. - pub trace_capturer: Option, + // The telemetry capturer being in flight. + pub telemetry_capturer: Option, } impl fmt::Debug for PrismaContext { @@ -26,7 +26,7 @@ pub struct ContextBuilder { enable_raw_queries: bool, schema: psl::ValidatedSchema, metrics: Option, - trace_capturer: Option, + telemetry_capturer: Option, } impl ContextBuilder { @@ -40,8 +40,8 @@ impl ContextBuilder { self } - pub fn set_trace_capturer(mut self, trace_capturer: Option) -> Self { - self.trace_capturer = trace_capturer; + pub fn set_telemetry_capturer(mut self, telemetry_capturer: Option) -> Self { + self.telemetry_capturer = telemetry_capturer; self } @@ -50,7 +50,7 @@ impl ContextBuilder { self.schema, self.enable_raw_queries, self.metrics.unwrap_or_default(), - self.trace_capturer, + self.telemetry_capturer, ) .await } @@ -62,7 +62,7 @@ impl PrismaContext { schema: psl::ValidatedSchema, enable_raw_queries: bool, metrics: MetricRegistry, - trace_capturer: Option, + telemetry_capturer: Option, ) -> PrismaResult { let config = &schema.configuration; // We only support one data source at the moment, so take the first one (default not exposed yet). @@ -86,7 +86,7 @@ impl PrismaContext { query_schema, executor, metrics, - trace_capturer, + telemetry_capturer, }; context.verify_connection().await?; @@ -104,7 +104,7 @@ impl PrismaContext { enable_raw_queries: false, schema, metrics: None, - trace_capturer: None, + telemetry_capturer: None, } } diff --git a/query-engine/query-engine/src/lib.rs b/query-engine/query-engine/src/lib.rs index 83b44cd05d8e..2bc8c84f0c60 100644 --- a/query-engine/query-engine/src/lib.rs +++ b/query-engine/query-engine/src/lib.rs @@ -1,6 +1,4 @@ #![allow(clippy::derive_partial_eq_without_eq)] - -pub mod capture_exporter; pub mod cli; pub mod context; pub mod error; @@ -8,6 +6,7 @@ pub mod logger; pub mod opt; pub mod server; pub mod state; +pub mod telemetry_capturing; pub mod tracer; use error::PrismaError; diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 42ed8b3d3a3c..4506adac56d5 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -11,7 +11,7 @@ use query_engine_metrics::MetricRegistry; use tracing::{dispatcher::SetGlobalDefaultError, subscriber}; use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, EnvFilter, Layer}; -use crate::{capture_exporter::CaptureExporter, LogFormat}; +use crate::{telemetry_capturing, LogFormat}; type LoggerResult = Result; @@ -24,7 +24,7 @@ pub struct Logger<'a> { log_queries: bool, telemetry_endpoint: Option<&'a str>, metrics: Option, - trace_capturer: CaptureExporter, + capturing_layer: telemetry_capturing::Layer, } impl<'a> Logger<'a> { @@ -37,7 +37,7 @@ impl<'a> Logger<'a> { log_queries: false, telemetry_endpoint: None, metrics: None, - trace_capturer: CaptureExporter::new(), + capturing_layer: Default::default(), } } @@ -69,8 +69,8 @@ impl<'a> Logger<'a> { self.metrics = Some(metrics); } - pub fn trace_capturer(&self) -> CaptureExporter { - self.trace_capturer.clone() + pub fn capturing_layer(&self) -> telemetry_capturing::Layer { + self.capturing_layer.clone() } /// Install logger as a global. Can be called only once per application @@ -99,14 +99,13 @@ impl<'a> Logger<'a> { None }; - let capture = crate::capture_exporter::new_pipeline().install(self.trace_capturer()); - let capture_layer = tracing_opentelemetry::layer().with_tracer(capture); + let capture_layer = self.capturing_layer().with_filter(create_env_filter(true)); let subscriber = tracing_subscriber::registry() .with(fmt_layer) + .with(capture_layer) .with(self.metrics) - .with(otel_layer) - .with(capture_layer); + .with(otel_layer); subscriber::set_global_default(subscriber)?; Ok(()) diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index ff5773e0a391..2174c693a92b 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -1,5 +1,5 @@ -use crate::capture_exporter::{self}; use crate::state::State; +use crate::telemetry_capturing; use crate::{opt::PrismaOpt, PrismaResult}; use hyper::http::HeaderValue; use hyper::service::{make_service_fn, service_fn}; @@ -119,9 +119,9 @@ async fn graphql_handler(state: State, req: Request) -> Result) -> Result Response { pub(crate) fn process_gql_req_headers( req: &Request, - capturer: Option, -) -> (Option, Span, capture_exporter::Config, Option) { + capturer: Option, +) -> (Option, Span, telemetry_capturing::Config, Option) { let tx_id = get_transaction_id_from_header(req); let span = info_span!("prisma:engine", user_facing = true); @@ -420,19 +426,13 @@ pub(crate) fn process_gql_req_headers( pub fn create_capture_config( header: Option<&HeaderValue>, - capturer: Option, + capturer: Option, trace_id: TraceId, -) -> capture_exporter::Config { - let enabled = if let Some(h) = header { - h.to_str().unwrap_or("false") == "true" +) -> telemetry_capturing::Config { + if let Some(h) = header { + let options = h.to_str().unwrap_or("").into(); + telemetry_capturing::Config::new(options, trace_id) } else { - false - }; - - if !enabled || capturer.is_none() { - return capture_exporter::disabled(); + telemetry_capturing::Config::Disabled } - - let c = capturer.unwrap(); - capture_exporter::enabled(c, trace_id) } diff --git a/query-engine/query-engine/src/state.rs b/query-engine/query-engine/src/state.rs index 3db72bbfc248..2bf2a894a2e7 100644 --- a/query-engine/query-engine/src/state.rs +++ b/query-engine/query-engine/src/state.rs @@ -58,7 +58,7 @@ pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option Self { + if !opts.is_enabled() { + return Config::Disabled; + } else { + return Config::Enabled(ConfiguredLayer::new(opts, trace_id)); + } + } +} + +#[derive(Debug, Clone)] +pub struct ConfiguredLayer { + opts: Options, + trace_id: TraceId, + layer: Layer, +} + +impl ConfiguredLayer { + pub(crate) fn new(opts: Options, trace_id: TraceId) -> Self { + Self { + opts, + trace_id, + layer: Default::default(), + } + } + pub async fn start_capturing(self) { + todo!() + } + + pub async fn fetch_captures(self) -> (Option, Option) { + todo!() + } +} + +static VALID_LEVELS: &[&str] = &["error", "warn", "info", "query", "tracing"]; + +#[derive(Debug, Clone)] +pub struct Options { + pub(crate) included_log_levels: HashSet, + pub(crate) include_traces: bool, +} + +impl Options { + fn is_enabled(&self) -> bool { + self.include_traces || !self.included_log_levels.is_empty() + } +} + +impl From<&str> for Options { + fn from(s: &str) -> Self { + let chunks = s.split(","); + let mut set = HashSet::from_iter( + chunks + .into_iter() + .map(str::trim) + .map(str::to_lowercase) + .filter(|s| VALID_LEVELS.contains(&s.as_str())), + ); + + Self { + include_traces: set.remove("tracing"), + included_log_levels: set, + } + } +} + +//test for options from +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_options_from() { + let options = Options::from("error, warn, query, tracing"); + assert_eq!(options.included_log_levels.len(), 3); + assert!(options.included_log_levels.contains("error")); + assert!(options.included_log_levels.contains("warn")); + assert!(options.included_log_levels.contains("query")); + assert!(options.include_traces); + assert!(options.is_enabled()); + + let options = Options::from("foo, bar baz"); + assert!(!options.is_enabled()); + } +} diff --git a/query-engine/query-engine/src/telemetry_capturing/layer.rs b/query-engine/query-engine/src/telemetry_capturing/layer.rs new file mode 100644 index 000000000000..203d61843a91 --- /dev/null +++ b/query-engine/query-engine/src/telemetry_capturing/layer.rs @@ -0,0 +1,24 @@ +use tracing::{span, Subscriber}; +use tracing_subscriber::layer::Context; + +#[derive(serde::Serialize)] +pub struct CapturedLog {} + +#[derive(serde::Serialize)] +pub struct CapturedTrace {} + +pub type CapturedLogs = Vec; +pub type CapturedTraces = Vec; + +// An an implementation of the `tracing_subscriber::Layer` trait to events, and compose a filtered +// layer with it +#[derive(Debug, Default, Clone)] +pub struct Layer {} + +impl Layer {} + +impl tracing_subscriber::Layer for Layer { + fn on_event(&self, _event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {} + + fn on_close(&self, _id: span::Id, _ctx: Context<'_, S>) {} +} diff --git a/query-engine/query-engine/src/telemetry_capturing/mod.rs b/query-engine/query-engine/src/telemetry_capturing/mod.rs new file mode 100644 index 000000000000..98220080b8a4 --- /dev/null +++ b/query-engine/query-engine/src/telemetry_capturing/mod.rs @@ -0,0 +1,5 @@ +mod config; +mod layer; + +pub use config::*; +pub use layer::*;