From ad36b5f473269f882ba73449283d94d8103a0f7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=20Houl=C3=A9?= Date: Fri, 6 Jan 2023 08:43:14 +0100 Subject: [PATCH] sql-query-connector: take trace ids by reference This commit is limited to sql-query-connector, it should not interfere with https://github.com/prisma/prisma-engines/pull/3505. In general, we do not need ownership of the trace id in sql-query-connector, since we only re-render it in comments. Working with a reference is easier (it ii `Copy`, etc.), and it already saves us string copies and allocations with this commit. The other purpose of this PR is that we discussed yesterday about introducing a `QueryContext<'_>` type struct to hold things like the trace id and connection info. Experience from designing similar context structs in the schema team showed it is much easier to do if everything in the context can be a reference. On the side, I could not resist making a few public items crate-public, to make the public surface of the crate smaller and clearer. --- .../src/database/connection.rs | 29 ++++++++----- .../src/database/operations/read.rs | 40 +++++++----------- .../src/database/operations/upsert.rs | 13 +++--- .../src/database/operations/write.rs | 42 +++++++++---------- .../src/database/transaction.rs | 29 ++++++++----- .../sql-query-connector/src/ordering.rs | 6 +-- .../src/query_arguments_ext.rs | 2 +- .../src/query_builder/mod.rs | 7 +--- .../src/query_builder/read.rs | 30 ++++++------- .../src/query_builder/write.rs | 27 ++++++------ .../sql-query-connector/src/query_ext.rs | 17 +++----- .../sql-query-connector/src/sql_trace.rs | 4 +- 12 files changed, 118 insertions(+), 128 deletions(-) diff --git a/query-engine/connectors/sql-query-connector/src/database/connection.rs b/query-engine/connectors/sql-query-connector/src/database/connection.rs index 81f332247765..413045e0f901 100644 --- a/query-engine/connectors/sql-query-connector/src/database/connection.rs +++ b/query-engine/connectors/sql-query-connector/src/database/connection.rs @@ -95,7 +95,7 @@ where filter, &selected_fields.into(), aggr_selections, - trace_id, + trace_id.as_deref(), ) .await }) @@ -118,7 +118,7 @@ where &selected_fields.into(), aggr_selections, SqlInfo::from(&self.connection_info), - trace_id, + trace_id.as_deref(), ) .await }) @@ -132,7 +132,7 @@ where trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id).await + read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id.as_deref()).await }) .await } @@ -154,7 +154,7 @@ where selections, group_by, having, - trace_id, + trace_id.as_deref(), ) .await }) @@ -174,7 +174,14 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::create_record(&self.inner, &self.connection_info.sql_family(), model, args, trace_id).await + write::create_record( + &self.inner, + &self.connection_info.sql_family(), + model, + args, + trace_id.as_deref(), + ) + .await }) .await } @@ -193,7 +200,7 @@ where model, args, skip_duplicates, - trace_id, + trace_id.as_deref(), ) .await }) @@ -208,7 +215,7 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::update_records(&self.inner, model, record_filter, args, trace_id).await + write::update_records(&self.inner, model, record_filter, args, trace_id.as_deref()).await }) .await } @@ -221,7 +228,7 @@ where trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id).await?; + let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id.as_deref()).await?; Ok(res.pop()) }) .await @@ -234,7 +241,7 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::delete_records(&self.inner, model, record_filter, trace_id).await + write::delete_records(&self.inner, model, record_filter, trace_id.as_deref()).await }) .await } @@ -245,7 +252,7 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - native_upsert(&self.inner, upsert, trace_id).await + native_upsert(&self.inner, upsert, trace_id.as_deref()).await }) .await } @@ -270,7 +277,7 @@ where trace_id: Option, ) -> connector::Result<()> { catch(self.connection_info.clone(), async move { - write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id).await + write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id.as_deref()).await }) .await } diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/read.rs b/query-engine/connectors/sql-query-connector/src/database/operations/read.rs index 458dd85d3843..0c0b16b7e6c8 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/read.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/read.rs @@ -11,21 +11,15 @@ use futures::stream::{FuturesUnordered, StreamExt}; use prisma_models::*; use quaint::ast::*; -pub async fn get_single_record( +pub(crate) async fn get_single_record( conn: &dyn QueryExt, model: &ModelRef, filter: &Filter, selected_fields: &ModelProjection, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { - let query = read::get_records( - model, - selected_fields.as_columns(), - aggr_selections, - filter, - trace_id.clone(), - ); + let query = read::get_records(model, selected_fields.as_columns(), aggr_selections, filter, trace_id); let mut field_names: Vec<_> = selected_fields.db_names().collect(); let mut aggr_field_names: Vec<_> = aggr_selections.iter().map(|aggr_sel| aggr_sel.db_alias()).collect(); @@ -61,7 +55,7 @@ pub async fn get_many_records( selected_fields: &ModelProjection, aggr_selections: &[RelAggregationSelection], sql_info: SqlInfo, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let reversed = query_arguments.needs_reversed_order(); @@ -112,15 +106,9 @@ pub async fn get_many_records( let mut futures = FuturesUnordered::new(); for args in batches.into_iter() { - let query = read::get_records( - model, - selected_fields.as_columns(), - aggr_selections, - args, - trace_id.clone(), - ); - - futures.push(conn.filter(query.into(), meta.as_slice(), trace_id.clone())); + let query = read::get_records(model, selected_fields.as_columns(), aggr_selections, args, trace_id); + + futures.push(conn.filter(query.into(), meta.as_slice(), trace_id)); } while let Some(result) = futures.next().await { @@ -139,7 +127,7 @@ pub async fn get_many_records( selected_fields.as_columns(), aggr_selections, query_arguments, - trace_id.clone(), + trace_id, ); for item in conn.filter(query.into(), meta.as_slice(), trace_id).await?.into_iter() { @@ -159,7 +147,7 @@ pub async fn get_related_m2m_record_ids( conn: &dyn QueryExt, from_field: &RelationFieldRef, from_record_ids: &[SelectionResult], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let mut idents = vec![]; idents.extend(ModelProjection::from(from_field.model().primary_identifier()).type_identifiers_with_arities()); @@ -231,7 +219,7 @@ pub async fn aggregate( selections: Vec, group_by: Vec, having: Option, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { if !group_by.is_empty() { group_by_aggregate(conn, model, query_arguments, selections, group_by, having, trace_id).await @@ -247,9 +235,9 @@ async fn plain_aggregate( model: &ModelRef, query_arguments: QueryArguments, selections: Vec, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { - let query = read::aggregate(model, &selections, query_arguments, trace_id.clone()); + let query = read::aggregate(model, &selections, query_arguments, trace_id); let idents: Vec<_> = selections .iter() @@ -274,9 +262,9 @@ async fn group_by_aggregate( selections: Vec, group_by: Vec, having: Option, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { - let query = read::group_by_aggregate(model, query_arguments, &selections, group_by, having, trace_id.clone()); + let query = read::group_by_aggregate(model, query_arguments, &selections, group_by, having, trace_id); let idents: Vec<_> = selections .iter() diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs b/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs index 91beb7577c98..1ce49c5753b1 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs @@ -1,20 +1,19 @@ -use connector_interface::NativeUpsert; -use prisma_models::{ModelProjection, Record, SingleRecord}; -use quaint::prelude::{OnConflict, Query}; - use crate::{ column_metadata, filter_conversion::AliasedCondition, model_extensions::AsColumns, - query_builder::{build_update_and_set_query, create_record}, + query_builder::write::{build_update_and_set_query, create_record}, query_ext::QueryExt, row::ToSqlRow, }; +use connector_interface::NativeUpsert; +use prisma_models::{ModelProjection, Record, SingleRecord}; +use quaint::prelude::{OnConflict, Query}; -pub async fn native_upsert( +pub(crate) async fn native_upsert( conn: &dyn QueryExt, upsert: NativeUpsert, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let selected_fields: ModelProjection = upsert.selected_fields().into(); let field_names: Vec<_> = selected_fields.db_names().collect(); diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs index b01d70af78d8..ea27f08bff3a 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs @@ -21,7 +21,7 @@ use user_facing_errors::query_engine::DatabaseConstraint; async fn generate_id( conn: &dyn QueryExt, primary_key: &FieldSelection, - trace_id: Option, + trace_id: Option<&str>, args: &WriteArgs, ) -> crate::Result> { // Go through all the values and generate a select statement with the correct MySQL function @@ -67,12 +67,12 @@ pub async fn create_record( sql_family: &SqlFamily, model: &ModelRef, mut args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let pk = model.primary_identifier(); let returned_id = if *sql_family == SqlFamily::Mysql { - generate_id(conn, &pk, trace_id.clone(), &args).await? + generate_id(conn, &pk, trace_id, &args).await? } else { args.as_record_projection(pk.clone().into()) }; @@ -159,7 +159,7 @@ pub async fn create_records( model: &ModelRef, args: Vec, skip_duplicates: bool, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { if args.is_empty() { return Ok(0); @@ -199,7 +199,7 @@ async fn create_many_nonempty( args: Vec, skip_duplicates: bool, affected_fields: HashSet, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let batches = if let Some(max_params) = sql_info.max_bind_values { // We need to split inserts if they are above a parameter threshold, as well as split based on number of rows. @@ -273,7 +273,7 @@ async fn create_many_nonempty( let mut count = 0; for batch in partitioned_batches { - let stmt = write::create_records_nonempty(model, batch, skip_duplicates, &affected_fields, trace_id.clone()); + let stmt = write::create_records_nonempty(model, batch, skip_duplicates, &affected_fields, trace_id); count += conn.execute(stmt.into()).await?; } @@ -286,7 +286,7 @@ async fn create_many_empty( model: &ModelRef, num_records: usize, skip_duplicates: bool, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let stmt = write::create_records_empty(model, skip_duplicates, trace_id); let mut count = 0; @@ -306,16 +306,14 @@ pub async fn update_record( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let id_args = pick_args(&model.primary_identifier().into(), &args); // This is to match the behaviour expected but it seems a bit strange to me // This comes across as if the update happened even if it didn't if args.args.is_empty() { - let ids: Vec = conn - .filter_selectors(model, record_filter.clone(), trace_id.clone()) - .await?; + let ids: Vec = conn.filter_selectors(model, record_filter.clone(), trace_id).await?; return Ok(ids); } @@ -332,10 +330,10 @@ async fn update_records_from_ids_and_filter( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result<(usize, Vec)> { let filter_condition = record_filter.clone().filter.aliased_condition_from(None, false); - let ids: Vec = conn.filter_selectors(model, record_filter, trace_id.clone()).await?; + let ids: Vec = conn.filter_selectors(model, record_filter, trace_id).await?; if ids.is_empty() { return Ok((0, Vec::new())); @@ -365,7 +363,7 @@ async fn update_records_from_filter( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let update = build_update_and_set_query(model, args, trace_id); let filter_condition = record_filter.clone().filter.aliased_condition_from(None, false); @@ -385,7 +383,7 @@ pub async fn update_records( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { if args.args.is_empty() { return Ok(0); @@ -400,14 +398,14 @@ pub async fn update_records( } /// Delete multiple records in `conn`, defined in the `Filter`. Result is the number of items deleted. -pub async fn delete_records( +pub(crate) async fn delete_records( conn: &dyn QueryExt, model: &ModelRef, record_filter: RecordFilter, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let filter_condition = record_filter.clone().filter.aliased_condition_from(None, false); - let ids = conn.filter_selectors(model, record_filter, trace_id.clone()).await?; + let ids = conn.filter_selectors(model, record_filter, trace_id).await?; let ids: Vec<&SelectionResult> = ids.iter().collect(); let count = ids.len(); @@ -428,7 +426,7 @@ pub async fn delete_records( /// Connect relations defined in `child_ids` to a parent defined in `parent_id`. /// The relation information is in the `RelationFieldRef`. -pub async fn m2m_connect( +pub(crate) async fn m2m_connect( conn: &dyn QueryExt, field: &RelationFieldRef, parent_id: &SelectionResult, @@ -442,12 +440,12 @@ pub async fn m2m_connect( /// Disconnect relations defined in `child_ids` to a parent defined in `parent_id`. /// The relation information is in the `RelationFieldRef`. -pub async fn m2m_disconnect( +pub(crate) async fn m2m_disconnect( conn: &dyn QueryExt, field: &RelationFieldRef, parent_id: &SelectionResult, child_ids: &[SelectionResult], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result<()> { let query = write::delete_relation_table_records(field, parent_id, child_ids, trace_id); conn.delete(query).await?; @@ -457,7 +455,7 @@ pub async fn m2m_disconnect( /// Execute a plain SQL query with the given parameters, returning the number of /// affected rows. -pub async fn execute_raw( +pub(crate) async fn execute_raw( conn: &dyn QueryExt, features: psl::PreviewFeatures, inputs: HashMap, diff --git a/query-engine/connectors/sql-query-connector/src/database/transaction.rs b/query-engine/connectors/sql-query-connector/src/database/transaction.rs index 9f7517d7f8fc..1406d00056cf 100644 --- a/query-engine/connectors/sql-query-connector/src/database/transaction.rs +++ b/query-engine/connectors/sql-query-connector/src/database/transaction.rs @@ -78,7 +78,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { filter, &selected_fields.into(), aggr_selections, - trace_id, + trace_id.as_deref(), ) .await }) @@ -101,7 +101,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { &selected_fields.into(), aggr_selections, SqlInfo::from(&self.connection_info), - trace_id, + trace_id.as_deref(), ) .await }) @@ -115,7 +115,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id).await + read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id.as_deref()).await }) .await } @@ -137,7 +137,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { selections, group_by, having, - trace_id, + trace_id.as_deref(), ) .await }) @@ -154,7 +154,14 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::create_record(&self.inner, &self.connection_info.sql_family(), model, args, trace_id).await + write::create_record( + &self.inner, + &self.connection_info.sql_family(), + model, + args, + trace_id.as_deref(), + ) + .await }) .await } @@ -173,7 +180,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { model, args, skip_duplicates, - trace_id, + trace_id.as_deref(), ) .await }) @@ -188,7 +195,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::update_records(&self.inner, model, record_filter, args, trace_id).await + write::update_records(&self.inner, model, record_filter, args, trace_id.as_deref()).await }) .await } @@ -201,7 +208,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id).await?; + let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id.as_deref()).await?; Ok(res.pop()) }) .await @@ -214,7 +221,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::delete_records(&self.inner, model, record_filter, trace_id).await + write::delete_records(&self.inner, model, record_filter, trace_id.as_deref()).await }) .await } @@ -225,7 +232,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - native_upsert(&self.inner, upsert, trace_id).await + native_upsert(&self.inner, upsert, trace_id.as_deref()).await }) .await } @@ -250,7 +257,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result<()> { catch(self.connection_info.clone(), async move { - write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id).await + write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id.as_deref()).await }) .await } diff --git a/query-engine/connectors/sql-query-connector/src/ordering.rs b/query-engine/connectors/sql-query-connector/src/ordering.rs index c8b672414270..3d40f07b6ebb 100644 --- a/query-engine/connectors/sql-query-connector/src/ordering.rs +++ b/query-engine/connectors/sql-query-connector/src/ordering.rs @@ -8,7 +8,7 @@ static ORDER_JOIN_PREFIX: &str = "orderby_"; static ORDER_AGGREGATOR_ALIAS: &str = "orderby_aggregator"; #[derive(Debug, Clone)] -pub struct OrderByDefinition { +pub(crate) struct OrderByDefinition { /// Final column identifier to be used for the scalar field to order by pub(crate) order_column: Expression<'static>, /// Defines ordering for an `ORDER BY` statement. @@ -18,14 +18,14 @@ pub struct OrderByDefinition { } #[derive(Debug, Default)] -pub struct OrderByBuilder { +pub(crate) struct OrderByBuilder { // Used to generate unique join alias join_counter: usize, } impl OrderByBuilder { /// Builds all expressions for an `ORDER BY` clause based on the query arguments. - pub fn build(&mut self, query_arguments: &QueryArguments) -> Vec { + pub(crate) fn build(&mut self, query_arguments: &QueryArguments) -> Vec { let needs_reversed_order = query_arguments.needs_reversed_order(); // The index is used to differentiate potentially separate relations to the same model. diff --git a/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs b/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs index 45885dd52d51..b0319b28b22c 100644 --- a/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs +++ b/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs @@ -1,6 +1,6 @@ use connector_interface::QueryArguments; -pub trait QueryArgumentsExt { +pub(crate) trait QueryArgumentsExt { /// If we need to take rows before a cursor position, then we need to reverse the order in SQL. fn needs_reversed_order(&self) -> bool; } diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs b/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs index 2124294a2f26..0a305e476ef3 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs @@ -1,8 +1,5 @@ -pub mod read; -pub mod write; - -pub use read::*; -pub use write::*; +pub(crate) mod read; +pub(crate) mod write; use crate::model_extensions::SelectionResultExt; use prisma_models::SelectionResult; diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/read.rs b/query-engine/connectors/sql-query-connector/src/query_builder/read.rs index 2a2fe48c93fb..a36513b0c61a 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/read.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/read.rs @@ -8,12 +8,12 @@ use prisma_models::*; use quaint::ast::*; use tracing::Span; -pub trait SelectDefinition { +pub(crate) trait SelectDefinition { fn into_select( self, _: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>); } @@ -22,7 +22,7 @@ impl SelectDefinition for Filter { self, model: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { let args = QueryArguments::from((model.clone(), self)); args.into_select(model, aggr_selections, trace_id) @@ -34,7 +34,7 @@ impl SelectDefinition for &Filter { self, model: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { self.clone().into_select(model, aggr_selections, trace_id) } @@ -45,7 +45,7 @@ impl SelectDefinition for Select<'static> { self, _: &ModelRef, _: &[RelAggregationSelection], - _trace_id: Option, + _trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { (self, vec![]) } @@ -56,7 +56,7 @@ impl SelectDefinition for QueryArguments { self, model: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { let order_by_definitions = OrderByBuilder::default().build(&self); let (table_opt, cursor_condition) = cursor_condition::build(&self, &model, &order_by_definitions); @@ -111,17 +111,17 @@ impl SelectDefinition for QueryArguments { } } -pub fn get_records( +pub(crate) fn get_records( model: &ModelRef, columns: impl Iterator>, aggr_selections: &[RelAggregationSelection], query: T, - trace_id: Option, + trace_id: Option<&str>, ) -> Select<'static> where T: SelectDefinition, { - let (select, additional_selection_set) = query.into_select(model, aggr_selections, trace_id.clone()); + let (select, additional_selection_set) = query.into_select(model, aggr_selections, trace_id); let select = columns.fold(select, |acc, col| acc.column(col)); let select = select.append_trace(&Span::current()).add_trace_id(trace_id); @@ -157,14 +157,14 @@ where /// ``` /// Important note: Do not use the AsColumn trait here as we need to construct column references that are relative, /// not absolute - e.g. `SELECT "field" FROM (...)` NOT `SELECT "full"."path"."to"."field" FROM (...)`. -pub fn aggregate( +pub(crate) fn aggregate( model: &ModelRef, selections: &[AggregationSelection], args: QueryArguments, - trace_id: Option, + trace_id: Option<&str>, ) -> Select<'static> { let columns = extract_columns(model, &selections); - let sub_query = get_records(model, columns.into_iter(), &[], args, trace_id.clone()); + let sub_query = get_records(model, columns.into_iter(), &[], args, trace_id); let sub_table = Table::from(sub_query).alias("sub"); selections.iter().fold( @@ -205,15 +205,15 @@ pub fn aggregate( ) } -pub fn group_by_aggregate( +pub(crate) fn group_by_aggregate( model: &ModelRef, args: QueryArguments, selections: &[AggregationSelection], group_by: Vec, having: Option, - trace_id: Option, + trace_id: Option<&str>, ) -> Select<'static> { - let (base_query, _) = args.into_select(model, &[], trace_id.clone()); + let (base_query, _) = args.into_select(model, &[], trace_id); let select_query = selections.iter().fold(base_query, |select, next_op| match next_op { AggregationSelection::Field(field) => select.column(field.as_column()), diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs index 882ec3502305..8bb3dbac06c3 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs @@ -1,5 +1,4 @@ -use crate::model_extensions::*; -use crate::sql_trace::SqlTraceComment; +use crate::{model_extensions::*, sql_trace::SqlTraceComment}; use connector_interface::{DatasourceFieldName, ScalarWriteOperation, WriteArgs}; use itertools::Itertools; use prisma_models::*; @@ -9,7 +8,7 @@ use tracing::Span; /// `INSERT` a new record to the database. Resulting an `INSERT` ast and an /// optional `RecordProjection` if available from the arguments or model. -pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option) -> Insert<'static> { +pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option<&str>) -> Insert<'static> { let fields: Vec<_> = model .fields() .scalar() @@ -39,12 +38,12 @@ pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Opt /// where each `WriteArg` in the Vec is one row. /// Requires `affected_fields` to be non-empty to produce valid SQL. #[allow(clippy::mutable_key_type)] -pub fn create_records_nonempty( +pub(crate) fn create_records_nonempty( model: &ModelRef, args: Vec, skip_duplicates: bool, affected_fields: &HashSet, - trace_id: Option, + trace_id: Option<&str>, ) -> Insert<'static> { // We need to bring all write args into a uniform shape. // The easiest way to do this is to take go over all fields of the batch and apply the following: @@ -87,7 +86,7 @@ pub fn create_records_nonempty( } /// `INSERT` empty records statement. -pub fn create_records_empty(model: &ModelRef, skip_duplicates: bool, trace_id: Option) -> Insert<'static> { +pub(crate) fn create_records_empty(model: &ModelRef, skip_duplicates: bool, trace_id: Option<&str>) -> Insert<'static> { let insert: Insert<'static> = Insert::single_into(model.as_table()).into(); let insert = insert.append_trace(&Span::current()).add_trace_id(trace_id); @@ -98,7 +97,7 @@ pub fn create_records_empty(model: &ModelRef, skip_duplicates: bool, trace_id: O } } -pub fn build_update_and_set_query(model: &ModelRef, args: WriteArgs, trace_id: Option) -> Update<'static> { +pub(crate) fn build_update_and_set_query(model: &ModelRef, args: WriteArgs, trace_id: Option<&str>) -> Update<'static> { let scalar_fields = model.fields().scalar(); let table = model.as_table(); let query = args @@ -153,7 +152,7 @@ pub fn build_update_and_set_query(model: &ModelRef, args: WriteArgs, trace_id: O query.append_trace(&Span::current()).add_trace_id(trace_id) } -pub fn chunk_update_with_ids( +pub(crate) fn chunk_update_with_ids( update: Update<'static>, model: &ModelRef, ids: &[&SelectionResult], @@ -168,11 +167,11 @@ pub fn chunk_update_with_ids( Ok(query) } -pub fn delete_many( +pub(crate) fn delete_many( model: &ModelRef, ids: &[&SelectionResult], filter_condition: ConditionTree<'static>, - trace_id: Option, + trace_id: Option<&str>, ) -> Vec> { let columns: Vec<_> = ModelProjection::from(model.primary_identifier()).as_columns().collect(); @@ -180,11 +179,11 @@ pub fn delete_many( Delete::from_table(model.as_table()) .so_that(conditions.and(filter_condition.clone())) .append_trace(&Span::current()) - .add_trace_id(trace_id.clone()) + .add_trace_id(trace_id) }) } -pub fn create_relation_table_records( +pub(crate) fn create_relation_table_records( field: &RelationFieldRef, parent_id: &SelectionResult, child_ids: &[SelectionResult], @@ -208,11 +207,11 @@ pub fn create_relation_table_records( insert.build().on_conflict(OnConflict::DoNothing).into() } -pub fn delete_relation_table_records( +pub(crate) fn delete_relation_table_records( parent_field: &RelationFieldRef, parent_id: &SelectionResult, child_ids: &[SelectionResult], - trace_id: Option, + trace_id: Option<&str>, ) -> Delete<'static> { let relation = parent_field.relation(); diff --git a/query-engine/connectors/sql-query-connector/src/query_ext.rs b/query-engine/connectors/sql-query-connector/src/query_ext.rs index a98481eca10b..7499e824b201 100644 --- a/query-engine/connectors/sql-query-connector/src/query_ext.rs +++ b/query-engine/connectors/sql-query-connector/src/query_ext.rs @@ -36,7 +36,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, q: Query<'_>, idents: &[ColumnMetadata<'_>], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let span = info_span!("filter read query"); @@ -118,12 +118,7 @@ pub trait QueryExt: Queryable + Send + Sync { } /// Select one row from the database. - async fn find( - &self, - q: Select<'_>, - meta: &[ColumnMetadata<'_>], - trace_id: Option, - ) -> crate::Result { + async fn find(&self, q: Select<'_>, meta: &[ColumnMetadata<'_>], trace_id: Option<&str>) -> crate::Result { self.filter(q.limit(1).into(), meta, trace_id) .await? .into_iter() @@ -137,7 +132,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, model: &ModelRef, record_filter: RecordFilter, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { if let Some(selectors) = record_filter.selectors { Ok(selectors) @@ -151,7 +146,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, model: &ModelRef, filter: Filter, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let model_id: ModelProjection = model.primary_identifier().into(); let id_cols: Vec> = model_id.as_columns().collect(); @@ -159,7 +154,7 @@ pub trait QueryExt: Queryable + Send + Sync { let select = Select::from_table(model.as_table()) .columns(id_cols) .append_trace(&Span::current()) - .add_trace_id(trace_id.clone()) + .add_trace_id(trace_id) .so_that(filter.aliased_condition_from(None, false)); self.select_ids(select, model_id, trace_id).await @@ -169,7 +164,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, select: Select<'_>, model_id: ModelProjection, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let idents: Vec<_> = model_id .fields() diff --git a/query-engine/connectors/sql-query-connector/src/sql_trace.rs b/query-engine/connectors/sql-query-connector/src/sql_trace.rs index 29cab8198215..7f2ab235c35e 100644 --- a/query-engine/connectors/sql-query-connector/src/sql_trace.rs +++ b/query-engine/connectors/sql-query-connector/src/sql_trace.rs @@ -13,7 +13,7 @@ pub fn trace_parent_to_string(context: &SpanContext) -> String { pub trait SqlTraceComment: Sized { fn append_trace(self, span: &Span) -> Self; - fn add_trace_id(self, trace_id: Option) -> Self; + fn add_trace_id(self, trace_id: Option<&str>) -> Self; } macro_rules! sql_trace { @@ -31,7 +31,7 @@ macro_rules! sql_trace { } } // Temporary method to pass the traceid in an operation - fn add_trace_id(self, trace_id: Option) -> Self { + fn add_trace_id(self, trace_id: Option<&str>) -> Self { if let Some(traceparent) = trace_id { if should_sample(&traceparent) { self.comment(format!("traceparent={}", traceparent))