Skip to content

Commit

Permalink
Capture logs at the beginning of a transaction when an error happens
Browse files Browse the repository at this point in the history
Errors contain an extensions prop, with `logs` and `traces` keys when capturing is enabled.
  • Loading branch information
miguelff committed Jan 17, 2023
1 parent bd123ba commit 7048670
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 15 deletions.
24 changes: 24 additions & 0 deletions query-engine/core/src/interactive_transactions/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,27 @@ pub enum TransactionError {
#[error("Unexpected response: {reason}.")]
Unknown { reason: String },
}

#[derive(Debug, serde::Serialize, PartialEq)]
pub struct ExtendedTransactionUserFacingError {
#[serde(flatten)]
user_facing_error: user_facing_errors::Error,

#[serde(skip_serializing_if = "indexmap::IndexMap::is_empty")]
extensions: crate::Map,
}

impl ExtendedTransactionUserFacingError {
pub fn set_extension(&mut self, key: String, val: serde_json::Value) {
self.extensions.entry(key).or_insert(crate::Item::Json(val));
}
}

impl From<crate::CoreError> for ExtendedTransactionUserFacingError {
fn from(error: crate::CoreError) -> Self {
ExtendedTransactionUserFacingError {
user_facing_error: error.into(),
extensions: Default::default(),
}
}
}
48 changes: 33 additions & 15 deletions query-engine/query-engine/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use crate::{opt::PrismaOpt, PrismaResult};
use hyper::service::{make_service_fn, service_fn};
use hyper::{header::CONTENT_TYPE, Body, HeaderMap, Method, Request, Response, Server, StatusCode};
use opentelemetry::trace::{SpanId, TraceContextExt, TraceId};
use opentelemetry::{global, propagation::Extractor, Context};
use query_core::{schema::QuerySchemaRenderer, TxId};
use query_core::{telemetry, TransactionOptions};
use opentelemetry::{global, propagation::Extractor};
use query_core::{
schema::QuerySchemaRenderer, telemetry, ExtendedTransactionUserFacingError, TransactionOptions, TxId,
};
use query_engine_metrics::MetricFormat;
use request_handlers::{dmmf, GraphQLSchemaRenderer, GraphQlHandler};
use serde_json::json;
Expand Down Expand Up @@ -261,12 +262,12 @@ async fn handle_transaction(state: State, req: Request<Body>) -> Result<Response
if path.contains("commit") {
match state.cx.executor.commit_tx(tx_id).await {
Ok(_) => Ok(succuss_resp),
Err(err) => Ok(err_to_http_resp(err)),
Err(err) => Ok(err_to_http_resp(err, None)),
}
} else if path.contains("rollback") {
match state.cx.executor.rollback_tx(tx_id).await {
Ok(_) => Ok(succuss_resp),
Err(err) => Ok(err_to_http_resp(err)),
Err(err) => Ok(err_to_http_resp(err, None)),
}
} else {
let res = Response::builder()
Expand All @@ -286,9 +287,9 @@ async fn transaction_start_handler(state: State, req: Request<Body>) -> Result<R
let mut tx_opts: TransactionOptions = serde_json::from_slice(full_body.as_ref()).unwrap();

let ServerExecutionContext {
tx_id,
tx_id: _,
span,
trace_id,
trace_id: _,
capture_config,
} = ServerExecutionContext::builder(&headers)
.with_span(info_span!(
Expand All @@ -299,13 +300,24 @@ async fn transaction_start_handler(state: State, req: Request<Body>) -> Result<R
.with_tx_id(tx_opts.with_predefined_transaction_id())
.build();

match state
if let telemetry::capturing::Capturer::Enabled(capturer) = capture_config.clone() {
capturer.start_capturing().await;
}

let result = state
.cx
.executor
.start_tx(state.cx.query_schema().clone(), &tx_opts)
.instrument(span)
.await
{
.await;

let telemetry = if let telemetry::capturing::Capturer::Enabled(capturer) = capture_config {
capturer.fetch_captures().await
} else {
None
};

match result {
Ok(tx_id) => {
let result = json!({ "id": tx_id.to_string() });
let result_bytes = serde_json::to_vec(&result).unwrap();
Expand All @@ -318,7 +330,7 @@ async fn transaction_start_handler(state: State, req: Request<Body>) -> Result<R

Ok(res)
}
Err(err) => Ok(err_to_http_resp(err)),
Err(err) => Ok(err_to_http_resp(err, telemetry)),
}
}

Expand Down Expand Up @@ -357,7 +369,10 @@ impl<'a> Extractor for HeaderExtractor<'a> {
}
}

fn err_to_http_resp(err: query_core::CoreError) -> Response<Body> {
fn err_to_http_resp(
err: query_core::CoreError,
captured_telemetry: Option<telemetry::capturing::storage::Storage>,
) -> Response<Body> {
let status = match err {
query_core::CoreError::TransactionError(ref err) => match err {
query_core::TransactionError::AcquisitionTimeout => 504,
Expand All @@ -371,9 +386,12 @@ fn err_to_http_resp(err: query_core::CoreError) -> Response<Body> {
_ => 500,
};

let user_error: user_facing_errors::Error = err.into();
let body = Body::from(serde_json::to_vec(&user_error).unwrap());

let mut err: ExtendedTransactionUserFacingError = err.into();
if let Some(telemetry) = captured_telemetry {
err.set_extension("traces".to_owned(), json!(telemetry.traces));
err.set_extension("logs".to_owned(), json!(telemetry.logs));
}
let body = Body::from(serde_json::to_vec(&err).unwrap());
Response::builder().status(status).body(body).unwrap()
}

Expand Down

0 comments on commit 7048670

Please sign in to comment.