Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: (qe) Preparatory refactoring to introduce log capturing. #3617

Merged
merged 5 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ impl Actor {
pub async fn spawn() -> TestResult<Self> {
let (log_capture, log_tx) = TestLogCapture::new();
async fn with_logs<T>(fut: impl Future<Output = T>, log_tx: LogEmit) -> T {
fut.with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, setup_metrics(), log_tx))
.await
fut.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
setup_metrics(),
log_tx,
))
.await
}

let (query_sender, mut query_receiver) = mpsc::channel(100);
Expand Down
12 changes: 10 additions & 2 deletions query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ fn run_relation_link_test_impl(

teardown_project(&datamodel, Default::default()).await.unwrap();
}
.with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, metrics_for_subscriber, log_tx)),
.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
)),
);
}
}
Expand Down Expand Up @@ -282,7 +286,11 @@ pub fn run_connector_test_impl(

crate::teardown_project(&datamodel, db_schemas).await.unwrap();
}
.with_subscriber(test_tracing_subscriber(&ENV_LOG_LEVEL, metrics_for_subscriber, log_tx)),
.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
)),
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use query_core::telemetry::helpers as telemetry_helpers;
use query_engine_metrics::MetricRegistry;
use tracing_error::ErrorLayer;
use tracing_subscriber::{layer::Layered, prelude::*, EnvFilter, Layer, Registry};
use tracing_subscriber::{layer::Layered, prelude::*, Layer, Registry};

use crate::LogEmit;

Expand All @@ -27,8 +28,8 @@ type Sub = Layered<
>,
>;

pub fn test_tracing_subscriber(log_config: &str, metrics: MetricRegistry, log_tx: LogEmit) -> Sub {
let filter = create_env_filter(true, log_config);
pub fn test_tracing_subscriber(log_config: String, metrics: MetricRegistry, log_tx: LogEmit) -> Sub {
let filter = telemetry_helpers::env_filter(true, telemetry_helpers::QueryEngineLogLevel::Override(log_config));

let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(PrintWriter::new(log_tx))
Expand All @@ -40,28 +41,6 @@ pub fn test_tracing_subscriber(log_config: &str, metrics: MetricRegistry, log_tx
.with(ErrorLayer::default())
}

fn create_env_filter(log_queries: bool, qe_log_level: &str) -> EnvFilter {
let mut filter = EnvFilter::from_default_env()
.add_directive("tide=error".parse().unwrap())
.add_directive("tonic=error".parse().unwrap())
.add_directive("h2=error".parse().unwrap())
.add_directive("hyper=error".parse().unwrap())
.add_directive("tower=error".parse().unwrap());

filter = filter
.add_directive(format!("query_engine={}", &qe_log_level).parse().unwrap())
.add_directive(format!("query_core={}", &qe_log_level).parse().unwrap())
.add_directive(format!("query_connector={}", &qe_log_level).parse().unwrap())
.add_directive(format!("sql_query_connector={}", &qe_log_level).parse().unwrap())
.add_directive("mongodb_query_connector=debug".parse().unwrap());

if log_queries {
filter = filter.add_directive("quaint[{is_query}]=trace".parse().unwrap());
}

filter
}

/// This is a temporary implementation detail for `tracing` logs in tests.
/// Instead of going through `std::io::stderr`, it goes through the specific
/// local stderr handle used by `eprintln` and `dbg`, allowing logs to appear in
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{ConnectorTag, RunnerInterface, TestError, TestResult, TxResult};
use query_core::{schema::QuerySchemaRef, TxId};
use query_engine::opt::PrismaOpt;
use query_engine::server::{routes, setup, State};
use query_engine::server::routes;
use query_engine::state::{setup, State};
use query_engine_metrics::MetricRegistry;
use request_handlers::{GQLBatchResponse, GQLError, GQLResponse, GraphQlBody, MultiQuery, PrismaResponse};

Expand All @@ -20,7 +21,7 @@ pub struct BinaryRunner {
impl RunnerInterface for BinaryRunner {
async fn load(datamodel: String, connector_tag: ConnectorTag, metrics: MetricRegistry) -> TestResult<Self> {
let opts = PrismaOpt::from_list(&["binary", "--enable-raw-queries", "--datamodel", &datamodel]);
let state = setup(&opts, metrics).await.unwrap();
let state = setup(&opts, false, Some(metrics)).await.unwrap();

let configuration = opts.configuration(true).unwrap();
let data_source = configuration.datasources.first().unwrap();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{ConnectorTag, RunnerInterface, TestResult, TxResult};
use colored::Colorize;
use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor, TxId};
use query_core::{executor, schema::QuerySchemaRef, schema_builder, QueryExecutor, TransactionOptions, TxId};
use query_engine_metrics::MetricRegistry;
use request_handlers::{GraphQlBody, GraphQlHandler, MultiQuery};
use std::{env, sync::Arc};
Expand Down Expand Up @@ -82,15 +82,9 @@ impl RunnerInterface for DirectRunner {
valid_for_millis: u64,
isolation_level: Option<String>,
) -> TestResult<TxId> {
let id = self
.executor
.start_tx(
self.query_schema.clone(),
max_acquisition_millis,
valid_for_millis,
isolation_level,
)
.await?;
let tx_opts = TransactionOptions::new(max_acquisition_millis, valid_for_millis, isolation_level);

let id = self.executor.start_tx(self.query_schema.clone(), tx_opts).await?;
Ok(id)
}

Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ thiserror = "1.0"
tokio.workspace = true
tracing = { version = "0.1", features = ["attributes"] }
tracing-futures = "0.2"
tracing-subscriber = "0.3.11"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
tracing-opentelemetry = "0.17.4"
url = "2"
user-facing-errors = { path = "../../libs/user-facing-errors" }
Expand Down
17 changes: 7 additions & 10 deletions query-engine/core/src/executor/interpreting_executor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::execute_operation::{execute_many_operations, execute_many_self_contained, execute_single_self_contained};
use crate::{
BatchDocumentTransaction, CoreError, OpenTx, Operation, QueryExecutor, ResponseData, TransactionActorManager,
TransactionError, TransactionManager, TxId,
TransactionError, TransactionManager, TransactionOptions, TxId,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -143,23 +143,20 @@ impl<C> TransactionManager for InterpretingExecutor<C>
where
C: Connector + Send + Sync,
{
async fn start_tx(
&self,
query_schema: QuerySchemaRef,
max_acquisition_millis: u64,
valid_for_millis: u64,
isolation_level: Option<String>,
) -> crate::Result<TxId> {
async fn start_tx(&self, query_schema: QuerySchemaRef, tx_opts: TransactionOptions) -> crate::Result<TxId> {
super::with_request_now(async move {
let id = TxId::default();
let isolation_level = tx_opts.isolation_level;
let valid_for_millis = tx_opts.valid_for_millis;
let id = tx_opts.new_tx_id.unwrap_or_default();

trace!("[{}] Starting...", id);
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = self.connector.name()
);
let conn = time::timeout(
Duration::from_millis(max_acquisition_millis),
Duration::from_millis(tx_opts.max_acquisition_millis),
self.connector.get_connection(),
)
.instrument(conn_span)
Expand Down
50 changes: 41 additions & 9 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod pipeline;

pub use execute_operation::*;
pub use loader::*;
use serde::Deserialize;

use crate::{
query_document::Operation, response_ir::ResponseData, schema::QuerySchemaRef, BatchDocumentTransaction, TxId,
Expand Down Expand Up @@ -53,20 +54,51 @@ pub trait QueryExecutor: TransactionManager {
fn primary_connector(&self) -> &(dyn Connector + Send + Sync);
}

#[derive(Debug, Deserialize)]
pub struct TransactionOptions {
/// Maximum wait time for tx acquisition in milliseconds.
#[serde(rename(deserialize = "max_wait"))]
pub max_acquisition_millis: u64,

/// Time in milliseconds after which the transaction rolls back automatically.
#[serde(rename(deserialize = "timeout"))]
pub valid_for_millis: u64,

/// Isolation level to use for the transaction.
pub isolation_level: Option<String>,

/// An optional pre-defined transaction id. Some value might be provided in case we want to generate
/// a new id at the beginning of the transaction
#[serde(skip_deserializing)]
pub new_tx_id: Option<TxId>,
}

impl TransactionOptions {
pub fn new(max_acquisition_millis: u64, valid_for_millis: u64, isolation_level: Option<String>) -> Self {
Self {
max_acquisition_millis,
valid_for_millis,
isolation_level,
new_tx_id: None,
}
}

/// Generates a new transaction id before the transaction is started and returns a modified version
/// of self with the new predefined_id set.
pub fn with_new_transaction_id(&mut self) -> TxId {
let tx_id: TxId = Default::default();
self.new_tx_id = Some(tx_id.clone());
tx_id
}
}
#[async_trait]
pub trait TransactionManager {
/// Starts a new transaction.
/// Returns ID of newly opened transaction.
/// Expected to throw an error if no transaction could be opened for `max_acquisition_millis` milliseconds.
/// The new transaction must only live for `valid_for_millis` milliseconds before it automatically rolls back.
/// Expected to throw an error if no transaction could be opened for `opts.max_acquisition_millis` milliseconds.
/// The new transaction must only live for `opts.valid_for_millis` milliseconds before it automatically rolls back.
/// This rollback mechanism is an implementation detail of the trait implementer.
async fn start_tx(
&self,
query_schema: QuerySchemaRef,
max_acquisition_millis: u64,
valid_for_millis: u64,
isolation_level: Option<String>,
) -> crate::Result<TxId>;
async fn start_tx(&self, query_schema: QuerySchemaRef, opts: TransactionOptions) -> crate::Result<TxId>;

/// Commits a transaction.
async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,22 @@ impl TransactionActorManager {
&self,
tx_id: &TxId,
operation: Operation,
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<ResponseData> {
let client = self.get_client(tx_id, "query").await?;

client.execute(operation, trace_id).await
client.execute(operation, traceparent).await
}

pub async fn batch_execute(
&self,
tx_id: &TxId,
operations: Vec<Operation>,
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let client = self.get_client(tx_id, "batch query").await?;

client.batch_execute(operations, trace_id).await
client.batch_execute(operations, traceparent).await
}

pub async fn commit_tx(&self, tx_id: &TxId) -> crate::Result<()> {
Expand Down
34 changes: 19 additions & 15 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
use crate::{
execute_many_operations, execute_single_operation, set_span_link_from_trace_id, ClosedTx, OpenTx, Operation,
ResponseData, TxId,
execute_many_operations, execute_single_operation, telemetry::helpers::set_span_link_from_traceparent, ClosedTx,
OpenTx, Operation, ResponseData, TxId,
};
use schema::QuerySchemaRef;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -51,13 +51,13 @@ impl ITXServer {
// RunState is used to tell if the run loop should continue
async fn process_msg(&mut self, op: TxOpRequest) -> RunState {
match op.msg {
TxOpRequestMsg::Single(ref operation, trace_id) => {
let result = self.execute_single(&operation, trace_id).await;
TxOpRequestMsg::Single(ref operation, traceparent) => {
let result = self.execute_single(&operation, traceparent).await;
let _ = op.respond_to.send(TxOpResponse::Single(result));
RunState::Continue
}
TxOpRequestMsg::Batch(ref operations, trace_id) => {
let result = self.execute_batch(&operations, trace_id).await;
TxOpRequestMsg::Batch(ref operations, traceparent) => {
let result = self.execute_batch(&operations, traceparent).await;
let _ = op.respond_to.send(TxOpResponse::Batch(result));
RunState::Continue
}
Expand All @@ -74,16 +74,20 @@ impl ITXServer {
}
}

async fn execute_single(&mut self, operation: &Operation, trace_id: Option<String>) -> crate::Result<ResponseData> {
async fn execute_single(
&mut self,
operation: &Operation,
traceparent: Option<String>,
) -> crate::Result<ResponseData> {
let span = info_span!("prisma:engine:itx_query_builder", user_facing = true);
set_span_link_from_trace_id(&span, trace_id.clone());
set_span_link_from_traceparent(&span, traceparent.clone());

let conn = self.cached_tx.as_open()?;
execute_single_operation(
self.query_schema.clone(),
conn.as_connection_like(),
operation,
trace_id,
traceparent,
)
.instrument(span)
.await
Expand All @@ -92,7 +96,7 @@ impl ITXServer {
async fn execute_batch(
&mut self,
operations: &[Operation],
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let span = info_span!("prisma:engine:itx_execute", user_facing = true);

Expand All @@ -101,7 +105,7 @@ impl ITXServer {
self.query_schema.clone(),
conn.as_connection_like(),
operations,
trace_id,
traceparent,
)
.instrument(span)
.await
Expand Down Expand Up @@ -168,8 +172,8 @@ impl ITXClient {
}
}

pub async fn execute(&self, operation: Operation, trace_id: Option<String>) -> crate::Result<ResponseData> {
let msg_req = TxOpRequestMsg::Single(operation, trace_id);
pub async fn execute(&self, operation: Operation, traceparent: Option<String>) -> crate::Result<ResponseData> {
let msg_req = TxOpRequestMsg::Single(operation, traceparent);
let msg = self.send_and_receive(msg_req).await?;

if let TxOpResponse::Single(resp) = msg {
Expand All @@ -182,9 +186,9 @@ impl ITXClient {
pub async fn batch_execute(
&self,
operations: Vec<Operation>,
trace_id: Option<String>,
traceparent: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let msg_req = TxOpRequestMsg::Batch(operations, trace_id);
let msg_req = TxOpRequestMsg::Batch(operations, traceparent);

let msg = self.send_and_receive(msg_req).await?;

Expand Down
Loading