Skip to content

Commit

Permalink
WIP: implement capturer as a layer rather than a SpanExporter
Browse files Browse the repository at this point in the history
Expecting a lot of tests to fail
  • Loading branch information
miguelff committed Dec 19, 2022
1 parent af5999f commit 9ff2b46
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 243 deletions.
6 changes: 3 additions & 3 deletions query-engine/core/src/trace_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ fn span_to_json(span: &SpanData) -> Value {
}

#[derive(Serialize, Debug, Clone)]
pub struct UserFacingSpan {
struct UserFacingSpan {
trace_id: String,
span_id: String,
parent_span_id: String,
name: String,
start_time: [u64; 2],
end_time: [u64; 2],
start_time: HrTime,
end_time: HrTime,
#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes: HashMap<String, String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
Expand Down
196 changes: 0 additions & 196 deletions query-engine/query-engine/src/capture_exporter.rs

This file was deleted.

20 changes: 10 additions & 10 deletions query-engine/query-engine/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{capture_exporter::CaptureExporter, PrismaError, PrismaResult};
use crate::{telemetry_capturing, PrismaError, PrismaResult};
use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor};
use query_engine_metrics::MetricRegistry;
use std::{env, fmt, sync::Arc};
Expand All @@ -12,8 +12,8 @@ pub struct PrismaContext {
pub metrics: MetricRegistry,
/// Central query executor.
pub executor: Box<dyn QueryExecutor + Send + Sync + 'static>,
// The trace capturer being in flight.
pub trace_capturer: Option<CaptureExporter>,
// The telemetry capturer being in flight.
pub telemetry_capturer: Option<telemetry_capturing::Layer>,
}

impl fmt::Debug for PrismaContext {
Expand All @@ -26,7 +26,7 @@ pub struct ContextBuilder {
enable_raw_queries: bool,
schema: psl::ValidatedSchema,
metrics: Option<MetricRegistry>,
trace_capturer: Option<CaptureExporter>,
telemetry_capturer: Option<telemetry_capturing::Layer>,
}

impl ContextBuilder {
Expand All @@ -40,8 +40,8 @@ impl ContextBuilder {
self
}

pub fn set_trace_capturer(mut self, trace_capturer: Option<CaptureExporter>) -> Self {
self.trace_capturer = trace_capturer;
pub fn set_telemetry_capturer(mut self, telemetry_capturer: Option<telemetry_capturing::Layer>) -> Self {
self.telemetry_capturer = telemetry_capturer;
self
}

Expand All @@ -50,7 +50,7 @@ impl ContextBuilder {
self.schema,
self.enable_raw_queries,
self.metrics.unwrap_or_default(),
self.trace_capturer,
self.telemetry_capturer,
)
.await
}
Expand All @@ -62,7 +62,7 @@ impl PrismaContext {
schema: psl::ValidatedSchema,
enable_raw_queries: bool,
metrics: MetricRegistry,
trace_capturer: Option<CaptureExporter>,
telemetry_capturer: Option<telemetry_capturing::Layer>,
) -> PrismaResult<Self> {
let config = &schema.configuration;
// We only support one data source at the moment, so take the first one (default not exposed yet).
Expand All @@ -86,7 +86,7 @@ impl PrismaContext {
query_schema,
executor,
metrics,
trace_capturer,
telemetry_capturer,
};

context.verify_connection().await?;
Expand All @@ -104,7 +104,7 @@ impl PrismaContext {
enable_raw_queries: false,
schema,
metrics: None,
trace_capturer: None,
telemetry_capturer: None,
}
}

Expand Down
3 changes: 1 addition & 2 deletions query-engine/query-engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#![allow(clippy::derive_partial_eq_without_eq)]

pub mod capture_exporter;
pub mod cli;
pub mod context;
pub mod error;
pub mod logger;
pub mod opt;
pub mod server;
pub mod state;
pub mod telemetry_capturing;
pub mod tracer;

use error::PrismaError;
Expand Down
17 changes: 8 additions & 9 deletions query-engine/query-engine/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use query_engine_metrics::MetricRegistry;
use tracing::{dispatcher::SetGlobalDefaultError, subscriber};
use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, EnvFilter, Layer};

use crate::{capture_exporter::CaptureExporter, LogFormat};
use crate::{telemetry_capturing, LogFormat};

type LoggerResult<T> = Result<T, SetGlobalDefaultError>;

Expand All @@ -24,7 +24,7 @@ pub struct Logger<'a> {
log_queries: bool,
telemetry_endpoint: Option<&'a str>,
metrics: Option<MetricRegistry>,
trace_capturer: CaptureExporter,
capturing_layer: telemetry_capturing::Layer,
}

impl<'a> Logger<'a> {
Expand All @@ -37,7 +37,7 @@ impl<'a> Logger<'a> {
log_queries: false,
telemetry_endpoint: None,
metrics: None,
trace_capturer: CaptureExporter::new(),
capturing_layer: Default::default(),
}
}

Expand Down Expand Up @@ -69,8 +69,8 @@ impl<'a> Logger<'a> {
self.metrics = Some(metrics);
}

pub fn trace_capturer(&self) -> CaptureExporter {
self.trace_capturer.clone()
pub fn capturing_layer(&self) -> telemetry_capturing::Layer {
self.capturing_layer.clone()
}

/// Install logger as a global. Can be called only once per application
Expand Down Expand Up @@ -99,14 +99,13 @@ impl<'a> Logger<'a> {
None
};

let capture = crate::capture_exporter::new_pipeline().install(self.trace_capturer());
let capture_layer = tracing_opentelemetry::layer().with_tracer(capture);
let capture_layer = self.capturing_layer().with_filter(create_env_filter(true));

let subscriber = tracing_subscriber::registry()
.with(fmt_layer)
.with(capture_layer)
.with(self.metrics)
.with(otel_layer)
.with(capture_layer);
.with(otel_layer);

subscriber::set_global_default(subscriber)?;
Ok(())
Expand Down
Loading

0 comments on commit 9ff2b46

Please sign in to comment.