Skip to content

Commit

Permalink
Refactor make trace conversion more explicit
Browse files Browse the repository at this point in the history
And move them to the telemetry package as an extension trait
  • Loading branch information
miguelff authored and Miguel Fernández committed Jan 30, 2023
1 parent 290fc36 commit 7905d7a
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 87 deletions.
86 changes: 1 addition & 85 deletions query-engine/core/src/interactive_transactions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::CoreError;
use connector::{Connection, ConnectionLike, Transaction};
use std::{collections::HashMap, fmt::Display};
use std::fmt::Display;
use tokio::{
task::JoinHandle,
time::{Duration, Instant},
Expand Down Expand Up @@ -76,63 +76,6 @@ impl Display for TxId {
}
}

impl From<TxId> for opentelemetry::Context {
// This is a bit of a hack, but it's the only way to have a default trace span for a whole
// transaction when no traceparent is propagated from the client.
//
// This is done so we can capture traces happening accross the different queries in a
// transaction. Otherwise, if a traceparent is not propagated from the client, each query in
// the transaction will run within a span that has already been generated at the begining of the
// transaction, and held active in the actor in charge of running the queries. Thus, making
// impossible to capture traces happening in the individual queries, as they won't be aware of
// the transaction they are part of.
//
// By generating this "fake" traceparent based on the transaction id, we can have a common
// trace_id for all transaction operations.
fn from(id: TxId) -> Self {
let extractor: HashMap<String, String> =
HashMap::from_iter(vec![("traceparent".to_string(), id.as_traceparent())]);
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor))
}
}

impl TxId {
pub fn as_traceparent(&self) -> String {
let trace_id = opentelemetry::trace::TraceId::from(self.clone());
format!("00-{}-0000000000000001-01", trace_id)
}
}

impl From<TxId> for opentelemetry::trace::TraceId {
// in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte,
// (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id.
// this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa"
//
// - first letter is always the same
// - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes
// - next 4 bytes are a counter since the server started
// - next 4 bytes are a system fingerprint, invariant for the same server instance
// - least significative 8 bytes. Totally random.
//
// We want the most entropic slice of 16 bytes that's deterministicly determined
fn from(id: TxId) -> Self {
let mut buffer = [0; 16];
let tx_id_bytes = id.0.as_bytes();
let len = tx_id_bytes.len();

// bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter
for (i, source_idx) in (len - 20..len - 12).enumerate() {
buffer[i] = tx_id_bytes[source_idx];
}
// bytes [len-8 to len): the random blocks
for (i, source_idx) in (len - 8..len).enumerate() {
buffer[i + 8] = tx_id_bytes[source_idx];
}

opentelemetry::trace::TraceId::from_bytes(buffer)
}
}

pub enum CachedTx {
Open(OpenTx),
Committed,
Expand Down Expand Up @@ -209,30 +152,3 @@ pub enum ClosedTx {
RolledBack,
Expired { start_time: Instant, timeout: Duration },
}

// tests for txid into traits
#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_txid_into_traceid() {
let fixture = vec![
("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"),
// counter changed, trace id changed:
("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// fingerprint changed, trace id did not change, as that chunk is ignored:
("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// first 5 bytes changed, trace id did not change, as that chunk is ignored:
("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp
("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"),
];

for (txid, expected_trace_id) in fixture {
let txid = TxId(txid.to_string());
let trace_id: opentelemetry::trace::TraceId = txid.into();
assert_eq!(trace_id.to_string(), expected_trace_id);
}
}
}
2 changes: 2 additions & 0 deletions query-engine/core/src/telemetry/capturing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
//!
pub use self::capturer::Capturer;
pub use self::settings::Settings;
pub use tx_ext::TxTraceExt;

use self::capturer::Processor;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -179,3 +180,4 @@ mod capturer;
mod helpers;
mod settings;
pub mod storage;
mod tx_ext;
88 changes: 88 additions & 0 deletions query-engine/core/src/telemetry/capturing/tx_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::collections::HashMap;

pub trait TxTraceExt {
fn into_trace_id(self) -> opentelemetry::trace::TraceId;
fn into_trace_context(self) -> opentelemetry::Context;
fn as_traceparent(&self) -> String;
}

impl TxTraceExt for crate::TxId {
// in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte,
// (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id.
// this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa"
//
// - first letter is always the same
// - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes
// - next 4 bytes are a counter since the server started
// - next 4 bytes are a system fingerprint, invariant for the same server instance
// - least significative 8 bytes. Totally random.
//
// We want the most entropic slice of 16 bytes that's deterministicly determined
fn into_trace_id(self) -> opentelemetry::trace::TraceId {
let mut buffer = [0; 16];
let str = self.to_string();
let tx_id_bytes = str.as_bytes();
let len = tx_id_bytes.len();

// bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter
for (i, source_idx) in (len - 20..len - 12).enumerate() {
buffer[i] = tx_id_bytes[source_idx];
}
// bytes [len-8 to len): the random blocks
for (i, source_idx) in (len - 8..len).enumerate() {
buffer[i + 8] = tx_id_bytes[source_idx];
}

opentelemetry::trace::TraceId::from_bytes(buffer)
}
// This is a bit of a hack, but it's the only way to have a default trace span for a whole
// transaction when no traceparent is propagated from the client.
//
// This is done so we can capture traces happening accross the different queries in a
// transaction. Otherwise, if a traceparent is not propagated from the client, each query in
// the transaction will run within a span that has already been generated at the begining of the
// transaction, and held active in the actor in charge of running the queries. Thus, making
// impossible to capture traces happening in the individual queries, as they won't be aware of
// the transaction they are part of.
//
// By generating this "fake" traceparent based on the transaction id, we can have a common
// trace_id for all transaction operations.
fn into_trace_context(self) -> opentelemetry::Context {
let extractor: HashMap<String, String> =
HashMap::from_iter(vec![("traceparent".to_string(), self.as_traceparent())]);
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor))
}

fn as_traceparent(&self) -> String {
let trace_id = self.clone().into_trace_id();
format!("00-{}-0000000000000001-01", trace_id)
}
}

// tests for txid into traits
#[cfg(test)]
mod test {
use super::*;
use crate::TxId;

#[test]
fn test_txid_into_traceid() {
let fixture = vec![
("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"),
// counter changed, trace id changed:
("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// fingerprint changed, trace id did not change, as that chunk is ignored:
("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// first 5 bytes changed, trace id did not change, as that chunk is ignored:
("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp
("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"),
];

for (txid, expected_trace_id) in fixture {
let txid: TxId = txid.into();
let trace_id: opentelemetry::trace::TraceId = txid.into_trace_id();
assert_eq!(trace_id.to_string(), expected_trace_id);
}
}
}
5 changes: 3 additions & 2 deletions query-engine/query-engine/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Se
use opentelemetry::trace::TraceContextExt;
use opentelemetry::{global, propagation::Extractor};
use query_core::helpers::*;
use query_core::telemetry::capturing::TxTraceExt;
use query_core::{
schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId,
};
Expand Down Expand Up @@ -156,7 +157,7 @@ async fn graphql_handler(state: State, req: Request<Body>) -> Result<Response<Bo
if capture_settings.logs_enabled() && tx_id.is_some() {
let tx_id = tx_id.clone().unwrap();
traceparent = Some(tx_id.as_traceparent());
trace_id = tx_id.into();
trace_id = tx_id.into_trace_id();
} else {
// this is the root span, and we are in a single operation.
traceparent = Some(get_trace_parent_from_span(&span));
Expand Down Expand Up @@ -332,7 +333,7 @@ async fn transaction_start_handler(state: State, req: Request<Body>) -> Result<R
let capture_settings = capture_settings(&headers);
let traceparent = traceparent(&headers);
if traceparent.is_none() && capture_settings.logs_enabled() {
span.set_parent(tx_id.into())
span.set_parent(tx_id.into_trace_context())
} else {
span.set_parent(get_parent_span_context(&headers))
}
Expand Down

0 comments on commit 7905d7a

Please sign in to comment.