Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement PDP/DataProxy tracing protocol in the binary engine #3505

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ validate:
cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma

qe:
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry --enable-telemetry-in-response

qe-dmmf:
cargo run --bin query-engine -- cli dmmf > dmmf.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ pub struct BinaryRunner {
#[async_trait::async_trait]
impl RunnerInterface for BinaryRunner {
async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult<Self> {
let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]);
let opts = PrismaOpt::from_list(&[
"binary",
"--enable-raw-queries",
"--enable-telemetry-in-response",
"--datamodel",
&datamodel,
]);
let state = setup(&opts, false, Some(metrics)).await.unwrap();

let configuration = opts.configuration(true).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ futures = "0.3"
indexmap = { version = "1.7", features = ["serde-1"] }
itertools = "0.10"
mongodb-connector = { path = "../connectors/mongodb-query-connector", package = "mongodb-query-connector", optional = true }
once_cell = "1.3"
once_cell = "1"
petgraph = "0.4"
prisma-models = { path = "../prisma-models" }
prisma-value = { path = "../../libs/prisma-value" }
opentelemetry = { version = "0.17"}
opentelemetry = { version = "0.17.0", features = ["rt-tokio", "serialize"] }
query-engine-metrics = {path = "../metrics"}
serde.workspace = true
serde_json.workspace = true
Expand All @@ -36,7 +36,7 @@ thiserror = "1.0"
tokio.workspace = true
tracing = { version = "0.1", features = ["attributes"] }
tracing-futures = "0.2"
tracing-subscriber = "0.3.11"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-opentelemetry = "0.17.4"
url = "2"
user-facing-errors = { path = "../../libs/user-facing-errors" }
Expand Down
29 changes: 29 additions & 0 deletions query-engine/core/src/interactive_transactions/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use thiserror::Error;

use crate::{
response_ir::{Item, Map},
CoreError,
};

#[derive(Debug, Error, PartialEq)]
pub enum TransactionError {
#[error("Unable to start a transaction in the given time.")]
Expand All @@ -17,3 +22,27 @@ pub enum TransactionError {
#[error("Unexpected response: {reason}.")]
Unknown { reason: String },
}

#[derive(Debug, serde::Serialize, PartialEq)]
pub struct ExtendedTransactionUserFacingError {
#[serde(flatten)]
user_facing_error: user_facing_errors::Error,

#[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")]
extensions: Map,
}

impl ExtendedTransactionUserFacingError {
pub fn set_extension(&mut self, key: String, val: serde_json::Value) {
self.extensions.entry(key).or_insert(Item::Json(val));
}
}

impl From<CoreError> for ExtendedTransactionUserFacingError {
fn from(error: CoreError) -> Self {
ExtendedTransactionUserFacingError {
user_facing_error: error.into(),
extensions: Default::default(),
}
}
}
86 changes: 85 additions & 1 deletion query-engine/core/src/interactive_transactions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::CoreError;
use connector::{Connection, ConnectionLike, Transaction};
use std::fmt::Display;
use std::{collections::HashMap, fmt::Display};
use tokio::{
task::JoinHandle,
time::{Duration, Instant},
Expand Down Expand Up @@ -74,6 +74,63 @@ impl Display for TxId {
}
}

impl From<TxId> for opentelemetry::Context {
// This is a bit of a hack, but it's the only way to have a default trace span for a whole
// transaction when no traceparent is propagated from the client.
//
// This is done so we can capture traces happening accross the different queries in a
// transaction. Otherwise, if a traceparent is not propagated from the client, each query in
// the transaction will run within a span that has already been generated at the begining of the
// transaction, and held active in the actor in charge of running the queries. Thus, making
// impossible to capture traces happening in the individual queries, as they won't be aware of
// the transaction they are part of.
//
// By generating this "fake" traceparent based on the transaction id, we can have a common
// trace_id for all transaction operations.
fn from(id: TxId) -> Self {
let extractor: HashMap<String, String> =
HashMap::from_iter(vec![("traceparent".to_string(), id.as_traceparent())]);
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor))
}
}

impl TxId {
pub fn as_traceparent(&self) -> String {
let trace_id = opentelemetry::trace::TraceId::from(self.clone());
format!("00-{}-0000000000000001-01", trace_id)
}
}

impl From<TxId> for opentelemetry::trace::TraceId {
// in order to convert a TxId (a 48 bytes cuid) into a TraceId (16 bytes), we remove the first byte,
// (always 'c') and get the next 16 bytes, which are random enough to be used as a trace id.
// this is a typical cuid: "c-lct0q6ma-0004-rb04-h6en1roa"
//
// - first letter is always the same
// - next 7-8 byte are random a timestamp. There's more entropy in the least significative bytes
// - next 4 bytes are a counter since the server started
// - next 4 bytes are a system fingerprint, invariant for the same server instance
// - least significative 8 bytes. Totally random.
//
// We want the most entropic slice of 16 bytes that's deterministicly determined
fn from(id: TxId) -> Self {
let mut buffer = [0; 16];
let tx_id_bytes = id.0.as_bytes();
let len = tx_id_bytes.len();

// bytes [len-20 to len-12): least significative 4 bytes of the timestamp + 4 bytes counter
for (i, source_idx) in (len - 20..len - 12).enumerate() {
buffer[i] = tx_id_bytes[source_idx];
}
// bytes [len-8 to len): the random blocks
for (i, source_idx) in (len - 8..len).enumerate() {
buffer[i + 8] = tx_id_bytes[source_idx];
}

opentelemetry::trace::TraceId::from_bytes(buffer)
}
}

pub enum CachedTx {
Open(OpenTx),
Committed,
Expand Down Expand Up @@ -150,3 +207,30 @@ pub enum ClosedTx {
RolledBack,
Expired { start_time: Instant, timeout: Duration },
}

// tests for txid into traits
#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_txid_into_traceid() {
let fixture = vec![
("clct0q6ma0000rb04768tiqbj", "71366d6130303030373638746971626a"),
// counter changed, trace id changed:
("clct0q6ma0002rb04cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// fingerprint changed, trace id did not change, as that chunk is ignored:
("clct0q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// first 5 bytes changed, trace id did not change, as that chunk is ignored:
("00000q6ma00020000cpa6zkmx", "71366d6130303032637061367a6b6d78"),
// 6 th byte changed, trace id changed, as that chunk is part of the lsb of the timestamp
("0000006ma00020000cpa6zkmx", "30366d6130303032637061367a6b6d78"),
];

for (txid, expected_trace_id) in fixture {
let txid = TxId(txid.to_string());
let trace_id: opentelemetry::trace::TraceId = txid.into();
assert_eq!(trace_id.to_string(), expected_trace_id);
}
}
}
2 changes: 1 addition & 1 deletion query-engine/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod telemetry;
pub use self::{
error::{CoreError, FieldConversionError},
executor::{QueryExecutor, TransactionOptions},
interactive_transactions::{TransactionError, TxId},
interactive_transactions::{ExtendedTransactionUserFacingError, TransactionError, TxId},
query_document::*,
telemetry::*,
};
Expand Down
Loading