Skip to content

Commit

Permalink
Preparatory refactoring to introduce log capturing. These changes sho…
Browse files Browse the repository at this point in the history
…uld not change the existing behavior of the engine
  • Loading branch information
miguelff committed Jan 20, 2023
1 parent ef34c0d commit 4b6c971
Show file tree
Hide file tree
Showing 28 changed files with 720 additions and 601 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ impl Actor {
pub async fn spawn() -> TestResult<Self> {
let (log_capture, log_tx) = TestLogCapture::new();
async fn with_logs<T>(fut: impl Future<Output = T>, log_tx: LogEmit) -> T {
fut.with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, setup_metrics(), log_tx))
.await
fut.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
setup_metrics(),
log_tx,
))
.await
}

let (query_sender, mut query_receiver) = mpsc::channel(100);
Expand Down
12 changes: 10 additions & 2 deletions query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ fn run_relation_link_test_impl(

teardown_project(&datamodel, Default::default()).await.unwrap();
}
.with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, metrics_for_subscriber, log_tx)),
.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
)),
);
}
}
Expand Down Expand Up @@ -282,7 +286,11 @@ pub fn run_connector_test_impl(

crate::teardown_project(&datamodel, db_schemas).await.unwrap();
}
.with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, metrics_for_subscriber, log_tx)),
.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
)),
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use query_core::telemetry::helpers as telemetry_helpers;
use query_engine_metrics::MetricRegistry;
use tracing_error::ErrorLayer;
use tracing_subscriber::{layer::Layered, prelude::*, EnvFilter, Layer, Registry};
use tracing_subscriber::{layer::Layered, prelude::*, Layer, Registry};

use crate::LogEmit;

Expand All @@ -27,8 +28,8 @@ type Sub = Layered<
>,
>;

pub fn test_tracing_subscriber(log_config: &str, metrics: MetricRegistry, log_tx: LogEmit) -> Sub {
let filter = create_env_filter(true, log_config);
pub fn test_tracing_subscriber(log_config: String, metrics: MetricRegistry, log_tx: LogEmit) -> Sub {
let filter = telemetry_helpers::env_filter(true, telemetry_helpers::QueryEngineLogLevel::Override(log_config));

let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(PrintWriter::new(log_tx))
Expand All @@ -40,28 +41,6 @@ pub fn test_tracing_subscriber(log_config: &str, metrics: MetricRegistry, log_tx
.with(ErrorLayer::default())
}

fn create_env_filter(log_queries: bool, qe_log_level: &str) -> EnvFilter {
let mut filter = EnvFilter::from_default_env()
.add_directive("tide=error".parse().unwrap())
.add_directive("tonic=error".parse().unwrap())
.add_directive("h2=error".parse().unwrap())
.add_directive("hyper=error".parse().unwrap())
.add_directive("tower=error".parse().unwrap());

filter = filter
.add_directive(format!("query_engine={}", &qe_log_level).parse().unwrap())
.add_directive(format!("query_core={}", &qe_log_level).parse().unwrap())
.add_directive(format!("query_connector={}", &qe_log_level).parse().unwrap())
.add_directive(format!("sql_query_connector={}", &qe_log_level).parse().unwrap())
.add_directive("mongodb_query_connector=debug".parse().unwrap());

if log_queries {
filter = filter.add_directive("quaint[{is_query}]=trace".parse().unwrap());
}

filter
}

/// This is a temporary implementation detail for `tracing` logs in tests.
/// Instead of going through `std::io::stderr`, it goes through the specific
/// local stderr handle used by `eprintln` and `dbg`, allowing logs to appear in
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{ConnectorTag, RunnerInterface, TestError, TestResult, TxResult};
use query_core::{schema::QuerySchemaRef, TxId};
use query_engine::opt::PrismaOpt;
use query_engine::server::{routes, setup, State};
use query_engine::server::routes;
use query_engine::state::{setup, State};
use query_engine_metrics::MetricRegistry;
use request_handlers::{GQLBatchResponse, GQLError, GQLResponse, GraphQlBody, MultiQuery, PrismaResponse};

Expand All @@ -20,7 +21,7 @@ pub struct BinaryRunner {
impl RunnerInterface for BinaryRunner {
async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult<Self> {
let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]);
let state = setup(&opts, metrics).await.unwrap();
let state = setup(&opts, false, Some(metrics)).await.unwrap();

let configuration = opts.configuration(true).unwrap();
let data_source = configuration.datasources.first().unwrap();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{ConnectorTag, RunnerInterface, TestResult, TxResult};
use colored::Colorize;
use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor, TxId};
use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor, TransactionOptions, TxId};
use query_engine_metrics::MetricRegistry;
use request_handlers::{GraphQlBody, GraphQlHandler, MultiQuery};
use std::{env, sync::Arc};
Expand Down Expand Up @@ -82,15 +82,9 @@ impl RunnerInterface for DirectRunner {
valid_for_millis: u64,
isolation_level: Option<String>,
) -> TestResult<TxId> {
let id = self
.executor
.start_tx(
self.query_schema.clone(),
max_acquisition_millis,
valid_for_millis,
isolation_level,
)
.await?;
let tx_opts = TransactionOptions::new(max_acquisition_millis, valid_for_millis, isolation_level);

let id = self.executor.start_tx(self.query_schema.clone(), &tx_opts).await?;
Ok(id)
}

Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ thiserror = "1.0"
tokio.workspace = true
tracing = { version = "0.1", features = ["attributes"] }
tracing-futures = "0.2"
tracing-subscriber = "0.3.11"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
tracing-opentelemetry = "0.17.4"
url = "2"
user-facing-errors = { path = "../../libs/user-facing-errors" }
Expand Down
23 changes: 11 additions & 12 deletions query-engine/core/src/executor/interpreting_executor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::execute_operation::{execute_many_operations, execute_many_self_contained, execute_single_self_contained};
use crate::{
BatchDocumentTransaction, CoreError, OpenTx, Operation, QueryExecutor, ResponseData, TransactionActorManager,
TransactionError, TransactionManager, TxId,
TransactionError, TransactionManager, TransactionOptions, TxId,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -143,37 +143,36 @@ impl<C> TransactionManager for InterpretingExecutor<C>
where
C: Connector + Send + Sync,
{
async fn start_tx(
&self,
query_schema: QuerySchemaRef,
max_acquisition_millis: u64,
valid_for_millis: u64,
isolation_level: Option<String>,
) -> crate::Result<TxId> {
async fn start_tx(&self, query_schema: QuerySchemaRef, tx_opts: &TransactionOptions) -> crate::Result<TxId> {
super::with_request_now(async move {
let id = TxId::default();
let id = if let Some(predefined_tx_id) = tx_opts.new_tx_id.clone() {
predefined_tx_id.into()
} else {
TxId::default()
};

trace!("[{}] Starting...", id);
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = self.connector.name()
);
let conn = time::timeout(
Duration::from_millis(max_acquisition_millis),
Duration::from_millis(tx_opts.max_acquisition_millis),
self.connector.get_connection(),
)
.instrument(conn_span)
.await;

let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
let c_tx = OpenTx::start(conn, isolation_level).await?;
let c_tx = OpenTx::start(conn, tx_opts.isolation_level.clone()).await?;

self.itx_manager
.create_tx(
query_schema.clone(),
id.clone(),
c_tx,
Duration::from_millis(valid_for_millis),
Duration::from_millis(tx_opts.valid_for_millis),
)
.await;

Expand Down
50 changes: 41 additions & 9 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod pipeline;

pub use execute_operation::*;
pub use loader::*;
use serde::Deserialize;

use crate::{
query_document::Operation, response_ir::ResponseData, schema::QuerySchemaRef, BatchDocumentTransaction, TxId,
Expand Down Expand Up @@ -53,20 +54,51 @@ pub trait QueryExecutor: TransactionManager {
fn primary_connector(&self) -> &(dyn Connector + Send + Sync);
}

#[derive(Debug, Deserialize)]
pub struct TransactionOptions {
/// Maximum wait time for tx acquisition in milliseconds.
#[serde(rename(deserialize = "max_wait"))]
pub max_acquisition_millis: u64,

/// Time in milliseconds after which the transaction rolls back automatically.
#[serde(rename(deserialize = "timeout"))]
pub valid_for_millis: u64,

/// Isolation level to use for the transaction.
pub isolation_level: Option<String>,

/// An optional pre-defined transaction id. Some value might be provided in case we want to generate
/// a new id at the beginning of the transaction
#[serde(default)]
pub new_tx_id: Option<String>,
}

impl TransactionOptions {
pub fn new(max_acquisition_millis: u64, valid_for_millis: u64, isolation_level: Option<String>) -> Self {
Self {
max_acquisition_millis,
valid_for_millis,
isolation_level,
new_tx_id: None,
}
}

/// Generates a new transaction id before the transaction is started and returns a modified version
/// of self with the new predefined_id set.
pub fn with_new_transaction_id(&mut self) -> TxId {
let tx_id: TxId = Default::default();
self.new_tx_id = Some(tx_id.to_string());
tx_id
}
}
#[async_trait]
pub trait TransactionManager {
/// Starts a new transaction.
/// Returns ID of newly opened transaction.
/// Expected to throw an error if no transaction could be opened for `max_acquisition_millis` milliseconds.
/// The new transaction must only live for `valid_for_millis` milliseconds before it automatically rolls back.
/// Expected to throw an error if no transaction could be opened for `opts.max_acquisition_millis` milliseconds.
/// The new transaction must only live for `opts.valid_for_millis` milliseconds before it automatically rolls back.
/// This rollback mechanism is an implementation detail of the trait implementer.
async fn start_tx(
&self,
query_schema: QuerySchemaRef,
max_acquisition_millis: u64,
valid_for_millis: u64,
isolation_level: Option<String>,
) -> crate::Result<TxId>;
async fn start_tx(&self, query_schema: QuerySchemaRef, opts: &TransactionOptions) -> crate::Result<TxId>;

/// Commits a transaction.
async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,22 @@ impl TransactionActorManager {
&self,
tx_id: &TxId,
operation: Operation,
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<ResponseData> {
let client = self.get_client(tx_id, "query").await?;

client.execute(operation, trace_id).await
client.execute(operation, traceparent).await
}

pub async fn batch_execute(
&self,
tx_id: &TxId,
operations: Vec<Operation>,
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let client = self.get_client(tx_id, "batch query").await?;

client.batch_execute(operations, trace_id).await
client.batch_execute(operations, traceparent).await
}

pub async fn commit_tx(&self, tx_id: &TxId) -> crate::Result<()> {
Expand Down
34 changes: 19 additions & 15 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
use crate::{
execute_many_operations, execute_single_operation, set_span_link_from_trace_id, ClosedTx, OpenTx, Operation,
ResponseData, TxId,
execute_many_operations, execute_single_operation, telemetry::helpers::set_span_link_from_traceparent, ClosedTx,
OpenTx, Operation, ResponseData, TxId,
};
use schema::QuerySchemaRef;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -51,13 +51,13 @@ impl ITXServer {
// RunState is used to tell if the run loop should continue
async fn process_msg(&mut self, op: TxOpRequest) -> RunState {
match op.msg {
TxOpRequestMsg::Single(ref operation, trace_id) => {
let result = self.execute_single(&operation, trace_id).await;
TxOpRequestMsg::Single(ref operation, traceparent) => {
let result = self.execute_single(&operation, traceparent).await;
let _ = op.respond_to.send(TxOpResponse::Single(result));
RunState::Continue
}
TxOpRequestMsg::Batch(ref operations, trace_id) => {
let result = self.execute_batch(&operations, trace_id).await;
TxOpRequestMsg::Batch(ref operations, traceparent) => {
let result = self.execute_batch(&operations, traceparent).await;
let _ = op.respond_to.send(TxOpResponse::Batch(result));
RunState::Continue
}
Expand All @@ -74,16 +74,20 @@ impl ITXServer {
}
}

async fn execute_single(&mut self, operation: &Operation, trace_id: Option<String>) -> crate::Result<ResponseData> {
async fn execute_single(
&mut self,
operation: &Operation,
traceparent: Option<String>,
) -> crate::Result<ResponseData> {
let span = info_span!("prisma:engine:itx_query_builder", user_facing = true);
set_span_link_from_trace_id(&span, trace_id.clone());
set_span_link_from_traceparent(&span, traceparent.clone());

let conn = self.cached_tx.as_open()?;
execute_single_operation(
self.query_schema.clone(),
conn.as_connection_like(),
operation,
trace_id,
traceparent,
)
.instrument(span)
.await
Expand All @@ -92,7 +96,7 @@ impl ITXServer {
async fn execute_batch(
&mut self,
operations: &[Operation],
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let span = info_span!("prisma:engine:itx_execute", user_facing = true);

Expand All @@ -101,7 +105,7 @@ impl ITXServer {
self.query_schema.clone(),
conn.as_connection_like(),
operations,
trace_id,
traceparent,
)
.instrument(span)
.await
Expand Down Expand Up @@ -168,8 +172,8 @@ impl ITXClient {
}
}

pub async fn execute(&self, operation: Operation, trace_id: Option<String>) -> crate::Result<ResponseData> {
let msg_req = TxOpRequestMsg::Single(operation, trace_id);
pub async fn execute(&self, operation: Operation, traceparent: Option<String>) -> crate::Result<ResponseData> {
let msg_req = TxOpRequestMsg::Single(operation, traceparent);
let msg = self.send_and_receive(msg_req).await?;

if let TxOpResponse::Single(resp) = msg {
Expand All @@ -182,9 +186,9 @@ impl ITXClient {
pub async fn batch_execute(
&self,
operations: Vec<Operation>,
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let msg_req = TxOpRequestMsg::Batch(operations, trace_id);
let msg_req = TxOpRequestMsg::Batch(operations, traceparent);

let msg = self.send_and_receive(msg_req).await?;

Expand Down
Loading

0 comments on commit 4b6c971

Please sign in to comment.