Skip to content

Commit

Permalink
Implement log/tracing capturing
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Jan 24, 2023
1 parent d4977fb commit 9a3c4f5
Show file tree
Hide file tree
Showing 20 changed files with 1,135 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ validate:
cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma

qe:
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-telemetry-in-response

qe-dmmf:
cargo run --bin query-engine -- cli dmmf > dmmf.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ pub struct BinaryRunner {
#[async_trait::async_trait]
impl RunnerInterface for BinaryRunner {
async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult<Self> {
let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]);
let opts = PrismaOpt::from_list(&[
"binary",
"--enable-raw-queries",
"--enable-telemetry-in-response",
"--datamodel",
&datamodel,
]);
let state = setup(&opts, false, Some(metrics)).await.unwrap();

let configuration = opts.configuration(true).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ futures = "0.3"
indexmap = { version = "1.7", features = ["serde-1"] }
itertools = "0.10"
mongodb-connector = { path = "../connectors/mongodb-query-connector", package = "mongodb-query-connector", optional = true }
once_cell = "1.3"
once_cell = "1"
petgraph = "0.4"
prisma-models = { path = "../prisma-models" }
prisma-value = { path = "../../libs/prisma-value" }
opentelemetry = { version = "0.17"}
opentelemetry = { version = "0.17.0", features = ["rt-tokio", "serialize"] }
query-engine-metrics = {path = "../metrics"}
serde.workspace = true
serde_json.workspace = true
Expand All @@ -36,7 +36,7 @@ thiserror = "1.0"
tokio.workspace = true
tracing = { version = "0.1", features = ["attributes"] }
tracing-futures = "0.2"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-opentelemetry = "0.17.4"
url = "2"
user-facing-errors = { path = "../../libs/user-facing-errors" }
Expand Down
29 changes: 29 additions & 0 deletions query-engine/core/src/interactive_transactions/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use thiserror::Error;

use crate::{
response_ir::{Item, Map},
CoreError,
};

#[derive(Debug, Error, PartialEq)]
pub enum TransactionError {
#[error("Unable to start a transaction in the given time.")]
Expand All @@ -17,3 +22,27 @@ pub enum TransactionError {
#[error("Unexpected response: {reason}.")]
Unknown { reason: String },
}

#[derive(Debug, serde::Serialize, PartialEq)]
pub struct ExtendedTransactionUserFacingError {
#[serde(flatten)]
user_facing_error: user_facing_errors::Error,

#[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")]
extensions: Map,
}

impl ExtendedTransactionUserFacingError {
pub fn set_extension(&mut self, key: String, val: serde_json::Value) {
self.extensions.entry(key).or_insert(Item::Json(val));
}
}

impl From<CoreError> for ExtendedTransactionUserFacingError {
fn from(error: CoreError) -> Self {
ExtendedTransactionUserFacingError {
user_facing_error: error.into(),
extensions: Default::default(),
}
}
}
86 changes: 85 additions & 1 deletion query-engine/core/src/interactive_transactions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::CoreError;
use connector::{Connection, ConnectionLike, Transaction};
use std::fmt::Display;
use std::{collections::HashMap, fmt::Display};
use tokio::{
task::JoinHandle,
time::{Duration, Instant},
Expand Down Expand Up @@ -76,6 +76,63 @@ impl Display for TxId {
}
}

impl From<TxId> for opentelemetry::Context {
// This is a bit of a hack, but it's the only way to have a default trace span for a whole
// transaction when no traceparent is propagated from the client.
//
// This is done so we can capture traces happening accross the different queries in a
// transaction. Otherwise, if a traceparent is not propagated from the client, each query in
// the transaction will run within a span that has already been generated at the begining of the
// transaction, and held active in the actor in charge of running the queries. Thus, making
// impossible to capture traces happening in the individual queries, as they won't be aware of
// the transaction they are part of.
//
// By generating this "fake" traceparent based on the transaction id, we can have a common
// trace_id for all transaction operations.
fn from(id: TxId) -> Self {
let extractor: HashMap<String, String> =
HashMap::from_iter(vec![("traceparent".to_string(), id.as_traceparent())]);
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor))
}
}

impl TxId {
pub fn as_traceparent(&self) -> String {
let trace_id = opentelemetry::trace::TraceId::from(self.clone());
format!("00-{}-0000000000000001-01", trace_id)
}
}

impl From<TxId> for opentelemetry::trace::TraceId {
// in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte,
// (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id.
// this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa"
//
// - first letter is always the same
// - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes
// - next 4 bytes are a counter since the server started
// - next 4 bytes are a system fingerprint, invariant for the same server instance
// - least significative 8 bytes. Totally random.
//
// We want the most entropic slice of 16 bytes that's deterministicly determined
fn from(id: TxId) -> Self {
let mut buffer = [0; 16];
let tx_id_bytes = id.0.as_bytes();
let len = tx_id_bytes.len();

// bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter
for (i, source_idx) in (len - 20..len - 12).enumerate() {
buffer[i] = tx_id_bytes[source_idx];
}
// bytes [len-8 to len): the random blocks
for (i, source_idx) in (len - 8..len).enumerate() {
buffer[i + 8] = tx_id_bytes[source_idx];
}

opentelemetry::trace::TraceId::from_bytes(buffer)
}
}

pub enum CachedTx {
Open(OpenTx),
Committed,
Expand Down Expand Up @@ -152,3 +209,30 @@ pub enum ClosedTx {
RolledBack,
Expired { start_time: Instant, timeout: Duration },
}

// tests for txid into traits
#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_txid_into_traceid() {
let fixture = vec![
("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"),
// counter changed, trace id changed:
("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// fingerprint changed, trace id did not change, as that chunk is ignored:
("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// first 5 bytes changed, trace id did not change, as that chunk is ignored:
("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp
("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"),
];

for (txid, expected_trace_id) in fixture {
let txid = TxId(txid.to_string());
let trace_id: opentelemetry::trace::TraceId = txid.into();
assert_eq!(trace_id.to_string(), expected_trace_id);
}
}
}
2 changes: 1 addition & 1 deletion query-engine/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod telemetry;
pub use self::{
error::{CoreError, FieldConversionError},
executor::{QueryExecutor, TransactionOptions},
interactive_transactions::{TransactionError, TxId},
interactive_transactions::{ExtendedTransactionUserFacingError, TransactionError, TxId},
query_document::*,
telemetry::*,
};
Expand Down
Loading

0 comments on commit 9a3c4f5

Please sign in to comment.