Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Jan 24, 2023
1 parent 8cdb1dd commit 5e8228a
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl RunnerInterface for DirectRunner {
) -> TestResult<TxId> {
let tx_opts = TransactionOptions::new(max_acquisition_millis, valid_for_millis, isolation_level);

let id = self.executor.start_tx(self.query_schema.clone(), &tx_opts).await?;
let id = self.executor.start_tx(self.query_schema.clone(), tx_opts).await?;
Ok(id)
}

Expand Down
6 changes: 3 additions & 3 deletions query-engine/core/src/executor/interpreting_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ impl<C> TransactionManager for InterpretingExecutor<C>
where
C: Connector + Send + Sync,
{
async fn start_tx(&self, query_schema: QuerySchemaRef, tx_opts: &TransactionOptions) -> crate::Result<TxId> {
async fn start_tx(&self, query_schema: QuerySchemaRef, tx_opts: TransactionOptions) -> crate::Result<TxId> {
super::with_request_now(async move {
let id = if let Some(predefined_tx_id) = tx_opts.new_tx_id.clone() {
let id = if let Some(predefined_tx_id) = tx_opts.new_tx_id {
predefined_tx_id.into()
} else {
TxId::default()
Expand All @@ -165,7 +165,7 @@ where
.await;

let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
let c_tx = OpenTx::start(conn, tx_opts.isolation_level.clone()).await?;
let c_tx = OpenTx::start(conn, tx_opts.isolation_level).await?;

self.itx_manager
.create_tx(
Expand Down
8 changes: 4 additions & 4 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ pub struct TransactionOptions {

/// An optional pre-defined transaction id. Some value might be provided in case we want to generate
/// a new id at the beginning of the transaction
#[serde(default)]
pub new_tx_id: Option<String>,
#[serde(skip_deserializing)]
pub new_tx_id: Option<TxId>,
}

impl TransactionOptions {
Expand All @@ -87,7 +87,7 @@ impl TransactionOptions {
/// of self with the new predefined_id set.
pub fn with_new_transaction_id(&mut self) -> TxId {
let tx_id: TxId = Default::default();
self.new_tx_id = Some(tx_id.to_string());
self.new_tx_id = Some(tx_id.clone());
tx_id
}
}
Expand All @@ -98,7 +98,7 @@ pub trait TransactionManager {
/// Expected to throw an error if no transaction could be opened for `opts.max_acquisition_millis` milliseconds.
/// The new transaction must only live for `opts.valid_for_millis` milliseconds before it automatically rolls back.
/// This rollback mechanism is an implementation detail of the trait implementer.
async fn start_tx(&self, query_schema: QuerySchemaRef, opts: &TransactionOptions) -> crate::Result<TxId>;
async fn start_tx(&self, query_schema: QuerySchemaRef, opts: TransactionOptions) -> crate::Result<TxId>;

/// Commits a transaction.
async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()>;
Expand Down
8 changes: 2 additions & 6 deletions query-engine/core/src/telemetry/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::EnvFilter;

pub static SHOW_ALL_TRACES: Lazy<bool> = Lazy::new(|| match std::env::var("PRISMA_SHOW_ALL_TRACES") {
Ok(enabled) => enabled.to_lowercase() == *("true"),
Ok(enabled) => enabled.eq_ignore_ascii_case("true"),
Err(_) => false,
});

Expand All @@ -20,11 +20,7 @@ pub fn spans_to_json(spans: Vec<SpanData>) -> String {
"span": true,
"spans": json_spans
});

match serde_json::to_string(&span_result) {
Ok(json_string) => json_string,
Err(_) => "".to_string(),
}
serde_json::to_string(&span_result).unwrap_or_default()
}

// set the parent context and return the traceparent
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine-node-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl QueryEngine {
let tx_opts: TransactionOptions = serde_json::from_str(&input)?;
match engine
.executor()
.start_tx(engine.query_schema().clone(), &tx_opts)
.start_tx(engine.query_schema().clone(), tx_opts)
.instrument(span)
.await
{
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ContextBuilder {

impl PrismaContext {
/// Initializes a new Prisma context.
pub async fn new(
async fn new(
schema: psl::ValidatedSchema,
enable_raw_queries: bool,
metrics: MetricRegistry,
Expand Down
6 changes: 3 additions & 3 deletions query-engine/query-engine/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Logger {
#[derive(Debug, Clone)]
enum TracingConfig {
// exposed means tracing will be exposed through an HTTP endpoint in a jaeger-compatible format
Exposed(String),
Http(String),
// stdout means that traces will be printed to standard output
Stdout,
// disabled means that tracing will be disabled
Expand Down Expand Up @@ -67,7 +67,7 @@ impl Logger {
};

self.tracing_config = match (enable_telemetry, endpoint) {
(true, Some(endpoint)) => TracingConfig::Exposed(endpoint),
(true, Some(endpoint)) => TracingConfig::Http(endpoint),
(true, None) => TracingConfig::Stdout,
_ => TracingConfig::Disabled,
};
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Logger {
.with(self.metrics.clone());

match self.tracing_config {
TracingConfig::Exposed(ref endpoint) => {
TracingConfig::Http(ref endpoint) => {
// Opentelemetry is enabled, but capturing is disabled, there's an endpoint to export
// the traces to.
let resource = Resource::new(vec![KeyValue::new("service.name", self.service_name)]);
Expand Down
10 changes: 3 additions & 7 deletions query-engine/query-engine/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async fn transaction_start_handler(state: State, req: Request<Body>) -> Result<R
let result = state
.cx
.executor
.start_tx(state.cx.query_schema().clone(), &tx_opts)
.start_tx(state.cx.query_schema().clone(), tx_opts)
.instrument(span)
.await;

Expand Down Expand Up @@ -369,19 +369,15 @@ fn traceparent(headers: &HeaderMap) -> Option<String> {

let is_valid_traceparent = |s: &String| s.split_terminator('-').count() >= 4;

match &value {
Some(str) if is_valid_traceparent(str) => value,
_ => None,
}
value.filter(is_valid_traceparent)
}

#[allow(clippy::bind_instead_of_map)]
fn transaction_id(headers: &HeaderMap) -> Option<TxId> {
const TRANSACTION_ID_HEADER: &str = "X-transaction-id";
headers
.get(TRANSACTION_ID_HEADER)
.and_then(|h| h.to_str().ok())
.and_then(|s| Some(TxId::from(s)))
.map(TxId::from)
}

/// If the client sends us a trace and span id, extracting a new context if the
Expand Down
20 changes: 3 additions & 17 deletions query-engine/query-engine/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use tracing::Instrument;

//// Shared application state.
#[derive(Clone)]
pub struct State {
pub cx: Arc<PrismaContext>,
pub enable_playground: bool,
Expand Down Expand Up @@ -33,23 +34,8 @@ impl State {
}
}

impl Clone for State {
fn clone(&self) -> Self {
Self {
cx: self.cx.clone(),
enable_playground: self.enable_playground,
enable_debug_mode: self.enable_debug_mode,
enable_metrics: self.enable_metrics,
}
}
}

pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option<MetricRegistry>) -> PrismaResult<State> {
let metrics = if metrics.is_none() {
MetricRegistry::new()
} else {
metrics.unwrap()
};
let metrics = metrics.unwrap_or_default();

let mut logger = Logger::new("prisma-engine-http");
logger.log_format(opts.log_format());
Expand All @@ -72,7 +58,7 @@ pub async fn setup(opts: &PrismaOpt, install_logger: bool, metrics: Option<Metri
let enable_metrics = config.preview_features().contains(PreviewFeature::Metrics) || opts.dataproxy_metric_override;
let span = tracing::info_span!("prisma:engine:connect");

let cx = PrismaContext::builder(datamodel) // opts.enable_raw_queries, metrics, logs_capture)
let cx = PrismaContext::builder(datamodel)
.set_metrics(metrics)
.enable_raw_queries(opts.enable_raw_queries)
.build()
Expand Down

0 comments on commit 5e8228a

Please sign in to comment.