diff --git a/query-engine/core/src/interactive_transactions/mod.rs b/query-engine/core/src/interactive_transactions/mod.rs index 7df5a2c33547..d3c73f705123 100644 --- a/query-engine/core/src/interactive_transactions/mod.rs +++ b/query-engine/core/src/interactive_transactions/mod.rs @@ -1,6 +1,6 @@ use crate::CoreError; use connector::{Connection, ConnectionLike, Transaction}; -use std::{collections::HashMap, fmt::Display}; +use std::fmt::Display; use tokio::{ task::JoinHandle, time::{Duration, Instant}, @@ -76,63 +76,6 @@ impl Display for TxId { } } -impl From for opentelemetry::Context { - // This is a bit of a hack, but it's the only way to have a default trace span for a whole - // transaction when no traceparent is propagated from the client. - // - // This is done so we can capture traces happening accross the different queries in a - // transaction. Otherwise, if a traceparent is not propagated from the client, each query in - // the transaction will run within a span that has already been generated at the begining of the - // transaction, and held active in the actor in charge of running the queries. Thus, making - // impossible to capture traces happening in the individual queries, as they won't be aware of - // the transaction they are part of. - // - // By generating this "fake" traceparent based on the transaction id, we can have a common - // trace_id for all transaction operations. - fn from(id: TxId) -> Self { - let extractor: HashMap = - HashMap::from_iter(vec![("traceparent".to_string(), id.as_traceparent())]); - opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) - } -} - -impl TxId { - pub fn as_traceparent(&self) -> String { - let trace_id = opentelemetry::trace::TraceId::from(self.clone()); - format!("00-{}-0000000000000001-01", trace_id) - } -} - -impl From for opentelemetry::trace::TraceId { - // in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte, - // (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id. - // this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa" - // - // - first letter is always the same - // - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes - // - next 4 bytes are a counter since the server started - // - next 4 bytes are a system fingerprint, invariant for the same server instance - // - least significative 8 bytes. Totally random. - // - // We want the most entropic slice of 16 bytes that's deterministicly determined - fn from(id: TxId) -> Self { - let mut buffer = [0; 16]; - let tx_id_bytes = id.0.as_bytes(); - let len = tx_id_bytes.len(); - - // bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter - for (i, source_idx) in (len - 20..len - 12).enumerate() { - buffer[i] = tx_id_bytes[source_idx]; - } - // bytes [len-8 to len): the random blocks - for (i, source_idx) in (len - 8..len).enumerate() { - buffer[i + 8] = tx_id_bytes[source_idx]; - } - - opentelemetry::trace::TraceId::from_bytes(buffer) - } -} - pub enum CachedTx { Open(OpenTx), Committed, @@ -209,30 +152,3 @@ pub enum ClosedTx { RolledBack, Expired { start_time: Instant, timeout: Duration }, } - -// tests for txid into traits -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_txid_into_traceid() { - let fixture = vec![ - ("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"), - // counter changed, trace id changed: - ("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"), - // fingerprint changed, trace id did not change, as that chunk is ignored: - ("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), - // first 5 bytes changed, trace id did not change, as that chunk is ignored: - ("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), - // 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp - ("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"), - ]; - - for (txid, expected_trace_id) in fixture { - let txid = TxId(txid.to_string()); - let trace_id: opentelemetry::trace::TraceId = txid.into(); - assert_eq!(trace_id.to_string(), expected_trace_id); - } - } -} diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs index a2f174db7c1b..82171f3156ee 100644 --- a/query-engine/core/src/telemetry/capturing/mod.rs +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -123,6 +123,7 @@ //! pub use self::capturer::Capturer; pub use self::settings::Settings; +pub use tx_ext::TxTraceExt; use self::capturer::Processor; use once_cell::sync::Lazy; @@ -179,3 +180,4 @@ mod capturer; mod helpers; mod settings; pub mod storage; +mod tx_ext; diff --git a/query-engine/core/src/telemetry/capturing/tx_ext.rs b/query-engine/core/src/telemetry/capturing/tx_ext.rs new file mode 100644 index 000000000000..5c0203613fe6 --- /dev/null +++ b/query-engine/core/src/telemetry/capturing/tx_ext.rs @@ -0,0 +1,88 @@ +use std::collections::HashMap; + +pub trait TxTraceExt { + fn into_trace_id(self) -> opentelemetry::trace::TraceId; + fn into_trace_context(self) -> opentelemetry::Context; + fn as_traceparent(&self) -> String; +} + +impl TxTraceExt for crate::TxId { + // in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte, + // (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id. + // this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa" + // + // - first letter is always the same + // - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes + // - next 4 bytes are a counter since the server started + // - next 4 bytes are a system fingerprint, invariant for the same server instance + // - least significative 8 bytes. Totally random. + // + // We want the most entropic slice of 16 bytes that's deterministicly determined + fn into_trace_id(self) -> opentelemetry::trace::TraceId { + let mut buffer = [0; 16]; + let str = self.to_string(); + let tx_id_bytes = str.as_bytes(); + let len = tx_id_bytes.len(); + + // bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter + for (i, source_idx) in (len - 20..len - 12).enumerate() { + buffer[i] = tx_id_bytes[source_idx]; + } + // bytes [len-8 to len): the random blocks + for (i, source_idx) in (len - 8..len).enumerate() { + buffer[i + 8] = tx_id_bytes[source_idx]; + } + + opentelemetry::trace::TraceId::from_bytes(buffer) + } + // This is a bit of a hack, but it's the only way to have a default trace span for a whole + // transaction when no traceparent is propagated from the client. + // + // This is done so we can capture traces happening accross the different queries in a + // transaction. Otherwise, if a traceparent is not propagated from the client, each query in + // the transaction will run within a span that has already been generated at the begining of the + // transaction, and held active in the actor in charge of running the queries. Thus, making + // impossible to capture traces happening in the individual queries, as they won't be aware of + // the transaction they are part of. + // + // By generating this "fake" traceparent based on the transaction id, we can have a common + // trace_id for all transaction operations. + fn into_trace_context(self) -> opentelemetry::Context { + let extractor: HashMap = + HashMap::from_iter(vec![("traceparent".to_string(), self.as_traceparent())]); + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)) + } + + fn as_traceparent(&self) -> String { + let trace_id = self.clone().into_trace_id(); + format!("00-{}-0000000000000001-01", trace_id) + } +} + +// tests for txid into traits +#[cfg(test)] +mod test { + use super::*; + use crate::TxId; + + #[test] + fn test_txid_into_traceid() { + let fixture = vec![ + ("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"), + // counter changed, trace id changed: + ("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // fingerprint changed, trace id did not change, as that chunk is ignored: + ("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // first 5 bytes changed, trace id did not change, as that chunk is ignored: + ("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"), + // 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp + ("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"), + ]; + + for (txid, expected_trace_id) in fixture { + let txid: TxId = txid.into(); + let trace_id: opentelemetry::trace::TraceId = txid.into_trace_id(); + assert_eq!(trace_id.to_string(), expected_trace_id); + } + } +} diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index 6afc72a5f0ad..ceaf949a725b 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -5,6 +5,7 @@ use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Se use opentelemetry::trace::TraceContextExt; use opentelemetry::{global, propagation::Extractor}; use query_core::helpers::*; +use query_core::telemetry::capturing::TxTraceExt; use query_core::{ schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId, }; @@ -156,7 +157,7 @@ async fn graphql_handler(state: State, req: Request) -> Result) -> Result