Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Jan 24, 2023
1 parent 51d48d0 commit dbd7ac1
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl RunnerInterface for DirectRunner {
) -> TestResult<TxId> {
let tx_opts = TransactionOptions::new(max_acquisition_millis, valid_for_millis, isolation_level);

let id = self.executor.start_tx(self.query_schema.clone(), &tx_opts).await?;
let id = self.executor.start_tx(self.query_schema.clone(), tx_opts).await?;
Ok(id)
}

Expand Down
14 changes: 6 additions & 8 deletions query-engine/core/src/executor/interpreting_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,11 @@ impl<C> TransactionManager for InterpretingExecutor<C>
where
C: Connector + Send + Sync,
{
async fn start_tx(&self, query_schema: QuerySchemaRef, tx_opts: &TransactionOptions) -> crate::Result<TxId> {
async fn start_tx(&self, query_schema: QuerySchemaRef, tx_opts: TransactionOptions) -> crate::Result<TxId> {
super::with_request_now(async move {
let id = if let Some(predefined_tx_id) = tx_opts.new_tx_id.clone() {
predefined_tx_id.into()
} else {
TxId::default()
};
let isolation_level = tx_opts.isolation_level;
let valid_for_millis = tx_opts.valid_for_millis;
let id = tx_opts.new_tx_id.unwrap_or_default();

trace!("[{}] Starting...", id);
let conn_span = info_span!(
Expand All @@ -165,14 +163,14 @@ where
.await;

let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
let c_tx = OpenTx::start(conn, tx_opts.isolation_level.clone()).await?;
let c_tx = OpenTx::start(conn, isolation_level).await?;

self.itx_manager
.create_tx(
query_schema.clone(),
id.clone(),
c_tx,
Duration::from_millis(tx_opts.valid_for_millis),
Duration::from_millis(valid_for_millis),
)
.await;

Expand Down
8 changes: 4 additions & 4 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ pub struct TransactionOptions {

/// An optional pre-defined transaction id. Some value might be provided in case we want to generate
/// a new id at the beginning of the transaction
#[serde(default)]
pub new_tx_id: Option<String>,
#[serde(skip_deserializing)]
pub new_tx_id: Option<TxId>,
}

impl TransactionOptions {
Expand All @@ -87,7 +87,7 @@ impl TransactionOptions {
/// of self with the new predefined_id set.
pub fn with_new_transaction_id(&mut self) -> TxId {
let tx_id: TxId = Default::default();
self.new_tx_id = Some(tx_id.to_string());
self.new_tx_id = Some(tx_id.clone());
tx_id
}
}
Expand All @@ -98,7 +98,7 @@ pub trait TransactionManager {
/// Expected to throw an error if no transaction could be opened for `opts.max_acquisition_millis` milliseconds.
/// The new transaction must only live for `opts.valid_for_millis` milliseconds before it automatically rolls back.
/// This rollback mechanism is an implementation detail of the trait implementer.
async fn start_tx(&self, query_schema: QuerySchemaRef, opts: &TransactionOptions) -> crate::Result<TxId>;
async fn start_tx(&self, query_schema: QuerySchemaRef, opts: TransactionOptions) -> crate::Result<TxId>;

/// Commits a transaction.
async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()>;
Expand Down
2 changes: 2 additions & 0 deletions query-engine/core/src/interactive_transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ where
{
fn from(s: T) -> Self {
let contents = s.into();
// This postcondition is to ensure that the TxId is long enough as to be able to derive
// a TraceId from it.
assert!(
contents.len() >= MINIMUM_TX_ID_LENGTH,
"minimum length for a TxId ({}) is {}, but was {}",
Expand Down
8 changes: 2 additions & 6 deletions query-engine/core/src/telemetry/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::EnvFilter;

pub static SHOW_ALL_TRACES: Lazy<bool> = Lazy::new(|| match std::env::var("PRISMA_SHOW_ALL_TRACES") {
Ok(enabled) => enabled.to_lowercase() == *("true"),
Ok(enabled) => enabled.eq_ignore_ascii_case("true"),
Err(_) => false,
});

Expand All @@ -20,11 +20,7 @@ pub fn spans_to_json(spans: Vec<SpanData>) -> String {
"span": true,
"spans": json_spans
});

match serde_json::to_string(&span_result) {
Ok(json_string) => json_string,
Err(_) => "".to_string(),
}
serde_json::to_string(&span_result).unwrap_or_default()
}

// set the parent context and return the traceparent
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine-node-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl QueryEngine {
let tx_opts: TransactionOptions = serde_json::from_str(&input)?;
match engine
.executor()
.start_tx(engine.query_schema().clone(), &tx_opts)
.start_tx(engine.query_schema().clone(), tx_opts)
.instrument(span)
.await
{
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ContextBuilder {

impl PrismaContext {
/// Initializes a new Prisma context.
pub async fn new(
async fn new(
schema: psl::ValidatedSchema,
enable_raw_queries: bool,
metrics: MetricRegistry,
Expand Down
6 changes: 3 additions & 3 deletions query-engine/query-engine/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Logger {
#[derive(Debug, Clone)]
enum TracingConfig {
// exposed means tracing will be exposed through an HTTP endpoint in a jaeger-compatible format
Exposed(String),
Http(String),
// stdout means that traces will be printed to standard output
Stdout,
// disabled means that tracing will be disabled
Expand Down Expand Up @@ -67,7 +67,7 @@ impl Logger {
};

self.tracing_config = match (enable_telemetry, endpoint) {
(true, Some(endpoint)) => TracingConfig::Exposed(endpoint),
(true, Some(endpoint)) => TracingConfig::Http(endpoint),
(true, None) => TracingConfig::Stdout,
_ => TracingConfig::Disabled,
};
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Logger {
.with(self.metrics.clone());

match self.tracing_config {
TracingConfig::Exposed(ref endpoint) => {
TracingConfig::Http(ref endpoint) => {
// Opentelemetry is enabled, but capturing is disabled, there's an endpoint to export
// the traces to.
let resource = Resource::new(vec![KeyValue::new("service.name", self.service_name)]);
Expand Down
10 changes: 3 additions & 7 deletions query-engine/query-engine/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async fn transaction_start_handler(state: State, req: Request<Body>) -> Result<R
let result = state
.cx
.executor
.start_tx(state.cx.query_schema().clone(), &tx_opts)
.start_tx(state.cx.query_schema().clone(), tx_opts)
.instrument(span)
.await;

Expand Down Expand Up @@ -369,19 +369,15 @@ fn traceparent(headers: &HeaderMap) -> Option<String> {

let is_valid_traceparent = |s: &String| s.split_terminator('-').count() >= 4;

match &value {
Some(str) if is_valid_traceparent(str) => value,
_ => None,
}
value.filter(is_valid_traceparent)
}

#[allow(clippy::bind_instead_of_map)]
fn transaction_id(headers: &HeaderMap) -> Option<TxId> {
const TRANSACTION_ID_HEADER: &str = "X-transaction-id";
headers
.get(TRANSACTION_ID_HEADER)
.and_then(|h| h.to_str().ok())
.and_then(|s| Some(TxId::from(s)))
.map(TxId::from)
}

/// If the client sends us a trace and span id, extracting a new context if the
Expand Down
20 changes: 3 additions & 17 deletions query-engine/query-engine/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use tracing::Instrument;

//// Shared application state.
#[derive(Clone)]
pub struct State {
pub cx: Arc<PrismaContext>,
pub enable_playground: bool,
Expand Down Expand Up @@ -33,23 +34,8 @@ impl State {
}
}

impl Clone for State {
fn clone(&self) -> Self {
Self {
cx: self.cx.clone(),
enable_playground: self.enable_playground,
enable_debug_mode: self.enable_debug_mode,
enable_metrics: self.enable_metrics,
}
}
}

pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option<MetricRegistry>) -> PrismaResult<State> {
let metrics = if metrics.is_none() {
MetricRegistry::new()
} else {
metrics.unwrap()
};
let metrics = metrics.unwrap_or_default();

let mut logger = Logger::new("prisma-engine-http");
logger.log_format(opts.log_format());
Expand All @@ -72,7 +58,7 @@ pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option<Metri
let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics) || opts.dataproxy_metric_override;
let span = tracing::info_span!("prisma:engine:connect");

let cx = PrismaContext::builder(datamodel) // opts.enable_raw_queries, metrics, logs_capture)
let cx = PrismaContext::builder(datamodel)
.set_metrics(metrics)
.enable_raw_queries(opts.enable_raw_queries)
.build()
Expand Down

0 comments on commit dbd7ac1

Please sign in to comment.