diff --git a/query-engine/connectors/sql-query-connector/src/cursor_condition.rs b/query-engine/connectors/sql-query-connector/src/cursor_condition.rs index aa3f267b3a29..97025f4a2298 100644 --- a/query-engine/connectors/sql-query-connector/src/cursor_condition.rs +++ b/query-engine/connectors/sql-query-connector/src/cursor_condition.rs @@ -118,7 +118,7 @@ struct CursorOrderForeignKey { /// OR `ModelA`.`modelB_id` IS NULL -- >>> Additional check for the nullable foreign key /// ) /// ``` -pub fn build( +pub(crate) fn build( query_arguments: &QueryArguments, model: &ModelRef, order_by_defs: &[OrderByDefinition], diff --git a/query-engine/connectors/sql-query-connector/src/database/connection.rs b/query-engine/connectors/sql-query-connector/src/database/connection.rs index 81f332247765..413045e0f901 100644 --- a/query-engine/connectors/sql-query-connector/src/database/connection.rs +++ b/query-engine/connectors/sql-query-connector/src/database/connection.rs @@ -95,7 +95,7 @@ where filter, &selected_fields.into(), aggr_selections, - trace_id, + trace_id.as_deref(), ) .await }) @@ -118,7 +118,7 @@ where &selected_fields.into(), aggr_selections, SqlInfo::from(&self.connection_info), - trace_id, + trace_id.as_deref(), ) .await }) @@ -132,7 +132,7 @@ where trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id).await + read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id.as_deref()).await }) .await } @@ -154,7 +154,7 @@ where selections, group_by, having, - trace_id, + trace_id.as_deref(), ) .await }) @@ -174,7 +174,14 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::create_record(&self.inner, &self.connection_info.sql_family(), model, args, trace_id).await + write::create_record( + &self.inner, + &self.connection_info.sql_family(), + model, + args, + trace_id.as_deref(), + ) + .await }) .await } @@ -193,7 +200,7 @@ where model, args, skip_duplicates, - trace_id, + trace_id.as_deref(), ) .await }) @@ -208,7 +215,7 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::update_records(&self.inner, model, record_filter, args, trace_id).await + write::update_records(&self.inner, model, record_filter, args, trace_id.as_deref()).await }) .await } @@ -221,7 +228,7 @@ where trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id).await?; + let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id.as_deref()).await?; Ok(res.pop()) }) .await @@ -234,7 +241,7 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::delete_records(&self.inner, model, record_filter, trace_id).await + write::delete_records(&self.inner, model, record_filter, trace_id.as_deref()).await }) .await } @@ -245,7 +252,7 @@ where trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - native_upsert(&self.inner, upsert, trace_id).await + native_upsert(&self.inner, upsert, trace_id.as_deref()).await }) .await } @@ -270,7 +277,7 @@ where trace_id: Option, ) -> connector::Result<()> { catch(self.connection_info.clone(), async move { - write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id).await + write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id.as_deref()).await }) .await } diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/read.rs b/query-engine/connectors/sql-query-connector/src/database/operations/read.rs index 458dd85d3843..0c0b16b7e6c8 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/read.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/read.rs @@ -11,21 +11,15 @@ use futures::stream::{FuturesUnordered, StreamExt}; use prisma_models::*; use quaint::ast::*; -pub async fn get_single_record( +pub(crate) async fn get_single_record( conn: &dyn QueryExt, model: &ModelRef, filter: &Filter, selected_fields: &ModelProjection, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { - let query = read::get_records( - model, - selected_fields.as_columns(), - aggr_selections, - filter, - trace_id.clone(), - ); + let query = read::get_records(model, selected_fields.as_columns(), aggr_selections, filter, trace_id); let mut field_names: Vec<_> = selected_fields.db_names().collect(); let mut aggr_field_names: Vec<_> = aggr_selections.iter().map(|aggr_sel| aggr_sel.db_alias()).collect(); @@ -61,7 +55,7 @@ pub async fn get_many_records( selected_fields: &ModelProjection, aggr_selections: &[RelAggregationSelection], sql_info: SqlInfo, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let reversed = query_arguments.needs_reversed_order(); @@ -112,15 +106,9 @@ pub async fn get_many_records( let mut futures = FuturesUnordered::new(); for args in batches.into_iter() { - let query = read::get_records( - model, - selected_fields.as_columns(), - aggr_selections, - args, - trace_id.clone(), - ); - - futures.push(conn.filter(query.into(), meta.as_slice(), trace_id.clone())); + let query = read::get_records(model, selected_fields.as_columns(), aggr_selections, args, trace_id); + + futures.push(conn.filter(query.into(), meta.as_slice(), trace_id)); } while let Some(result) = futures.next().await { @@ -139,7 +127,7 @@ pub async fn get_many_records( selected_fields.as_columns(), aggr_selections, query_arguments, - trace_id.clone(), + trace_id, ); for item in conn.filter(query.into(), meta.as_slice(), trace_id).await?.into_iter() { @@ -159,7 +147,7 @@ pub async fn get_related_m2m_record_ids( conn: &dyn QueryExt, from_field: &RelationFieldRef, from_record_ids: &[SelectionResult], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let mut idents = vec![]; idents.extend(ModelProjection::from(from_field.model().primary_identifier()).type_identifiers_with_arities()); @@ -231,7 +219,7 @@ pub async fn aggregate( selections: Vec, group_by: Vec, having: Option, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { if !group_by.is_empty() { group_by_aggregate(conn, model, query_arguments, selections, group_by, having, trace_id).await @@ -247,9 +235,9 @@ async fn plain_aggregate( model: &ModelRef, query_arguments: QueryArguments, selections: Vec, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { - let query = read::aggregate(model, &selections, query_arguments, trace_id.clone()); + let query = read::aggregate(model, &selections, query_arguments, trace_id); let idents: Vec<_> = selections .iter() @@ -274,9 +262,9 @@ async fn group_by_aggregate( selections: Vec, group_by: Vec, having: Option, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { - let query = read::group_by_aggregate(model, query_arguments, &selections, group_by, having, trace_id.clone()); + let query = read::group_by_aggregate(model, query_arguments, &selections, group_by, having, trace_id); let idents: Vec<_> = selections .iter() diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs b/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs index 91beb7577c98..1ce49c5753b1 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/upsert.rs @@ -1,20 +1,19 @@ -use connector_interface::NativeUpsert; -use prisma_models::{ModelProjection, Record, SingleRecord}; -use quaint::prelude::{OnConflict, Query}; - use crate::{ column_metadata, filter_conversion::AliasedCondition, model_extensions::AsColumns, - query_builder::{build_update_and_set_query, create_record}, + query_builder::write::{build_update_and_set_query, create_record}, query_ext::QueryExt, row::ToSqlRow, }; +use connector_interface::NativeUpsert; +use prisma_models::{ModelProjection, Record, SingleRecord}; +use quaint::prelude::{OnConflict, Query}; -pub async fn native_upsert( +pub(crate) async fn native_upsert( conn: &dyn QueryExt, upsert: NativeUpsert, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let selected_fields: ModelProjection = upsert.selected_fields().into(); let field_names: Vec<_> = selected_fields.db_names().collect(); diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs index b01d70af78d8..ea27f08bff3a 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs @@ -21,7 +21,7 @@ use user_facing_errors::query_engine::DatabaseConstraint; async fn generate_id( conn: &dyn QueryExt, primary_key: &FieldSelection, - trace_id: Option, + trace_id: Option<&str>, args: &WriteArgs, ) -> crate::Result> { // Go through all the values and generate a select statement with the correct MySQL function @@ -67,12 +67,12 @@ pub async fn create_record( sql_family: &SqlFamily, model: &ModelRef, mut args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let pk = model.primary_identifier(); let returned_id = if *sql_family == SqlFamily::Mysql { - generate_id(conn, &pk, trace_id.clone(), &args).await? + generate_id(conn, &pk, trace_id, &args).await? } else { args.as_record_projection(pk.clone().into()) }; @@ -159,7 +159,7 @@ pub async fn create_records( model: &ModelRef, args: Vec, skip_duplicates: bool, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { if args.is_empty() { return Ok(0); @@ -199,7 +199,7 @@ async fn create_many_nonempty( args: Vec, skip_duplicates: bool, affected_fields: HashSet, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let batches = if let Some(max_params) = sql_info.max_bind_values { // We need to split inserts if they are above a parameter threshold, as well as split based on number of rows. @@ -273,7 +273,7 @@ async fn create_many_nonempty( let mut count = 0; for batch in partitioned_batches { - let stmt = write::create_records_nonempty(model, batch, skip_duplicates, &affected_fields, trace_id.clone()); + let stmt = write::create_records_nonempty(model, batch, skip_duplicates, &affected_fields, trace_id); count += conn.execute(stmt.into()).await?; } @@ -286,7 +286,7 @@ async fn create_many_empty( model: &ModelRef, num_records: usize, skip_duplicates: bool, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let stmt = write::create_records_empty(model, skip_duplicates, trace_id); let mut count = 0; @@ -306,16 +306,14 @@ pub async fn update_record( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let id_args = pick_args(&model.primary_identifier().into(), &args); // This is to match the behaviour expected but it seems a bit strange to me // This comes across as if the update happened even if it didn't if args.args.is_empty() { - let ids: Vec = conn - .filter_selectors(model, record_filter.clone(), trace_id.clone()) - .await?; + let ids: Vec = conn.filter_selectors(model, record_filter.clone(), trace_id).await?; return Ok(ids); } @@ -332,10 +330,10 @@ async fn update_records_from_ids_and_filter( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result<(usize, Vec)> { let filter_condition = record_filter.clone().filter.aliased_condition_from(None, false); - let ids: Vec = conn.filter_selectors(model, record_filter, trace_id.clone()).await?; + let ids: Vec = conn.filter_selectors(model, record_filter, trace_id).await?; if ids.is_empty() { return Ok((0, Vec::new())); @@ -365,7 +363,7 @@ async fn update_records_from_filter( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let update = build_update_and_set_query(model, args, trace_id); let filter_condition = record_filter.clone().filter.aliased_condition_from(None, false); @@ -385,7 +383,7 @@ pub async fn update_records( model: &ModelRef, record_filter: RecordFilter, args: WriteArgs, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { if args.args.is_empty() { return Ok(0); @@ -400,14 +398,14 @@ pub async fn update_records( } /// Delete multiple records in `conn`, defined in the `Filter`. Result is the number of items deleted. -pub async fn delete_records( +pub(crate) async fn delete_records( conn: &dyn QueryExt, model: &ModelRef, record_filter: RecordFilter, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result { let filter_condition = record_filter.clone().filter.aliased_condition_from(None, false); - let ids = conn.filter_selectors(model, record_filter, trace_id.clone()).await?; + let ids = conn.filter_selectors(model, record_filter, trace_id).await?; let ids: Vec<&SelectionResult> = ids.iter().collect(); let count = ids.len(); @@ -428,7 +426,7 @@ pub async fn delete_records( /// Connect relations defined in `child_ids` to a parent defined in `parent_id`. /// The relation information is in the `RelationFieldRef`. -pub async fn m2m_connect( +pub(crate) async fn m2m_connect( conn: &dyn QueryExt, field: &RelationFieldRef, parent_id: &SelectionResult, @@ -442,12 +440,12 @@ pub async fn m2m_connect( /// Disconnect relations defined in `child_ids` to a parent defined in `parent_id`. /// The relation information is in the `RelationFieldRef`. -pub async fn m2m_disconnect( +pub(crate) async fn m2m_disconnect( conn: &dyn QueryExt, field: &RelationFieldRef, parent_id: &SelectionResult, child_ids: &[SelectionResult], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result<()> { let query = write::delete_relation_table_records(field, parent_id, child_ids, trace_id); conn.delete(query).await?; @@ -457,7 +455,7 @@ pub async fn m2m_disconnect( /// Execute a plain SQL query with the given parameters, returning the number of /// affected rows. -pub async fn execute_raw( +pub(crate) async fn execute_raw( conn: &dyn QueryExt, features: psl::PreviewFeatures, inputs: HashMap, diff --git a/query-engine/connectors/sql-query-connector/src/database/transaction.rs b/query-engine/connectors/sql-query-connector/src/database/transaction.rs index 9f7517d7f8fc..1406d00056cf 100644 --- a/query-engine/connectors/sql-query-connector/src/database/transaction.rs +++ b/query-engine/connectors/sql-query-connector/src/database/transaction.rs @@ -78,7 +78,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { filter, &selected_fields.into(), aggr_selections, - trace_id, + trace_id.as_deref(), ) .await }) @@ -101,7 +101,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { &selected_fields.into(), aggr_selections, SqlInfo::from(&self.connection_info), - trace_id, + trace_id.as_deref(), ) .await }) @@ -115,7 +115,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id).await + read::get_related_m2m_record_ids(&self.inner, from_field, from_record_ids, trace_id.as_deref()).await }) .await } @@ -137,7 +137,7 @@ impl<'tx> ReadOperations for SqlConnectorTransaction<'tx> { selections, group_by, having, - trace_id, + trace_id.as_deref(), ) .await }) @@ -154,7 +154,14 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::create_record(&self.inner, &self.connection_info.sql_family(), model, args, trace_id).await + write::create_record( + &self.inner, + &self.connection_info.sql_family(), + model, + args, + trace_id.as_deref(), + ) + .await }) .await } @@ -173,7 +180,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { model, args, skip_duplicates, - trace_id, + trace_id.as_deref(), ) .await }) @@ -188,7 +195,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::update_records(&self.inner, model, record_filter, args, trace_id).await + write::update_records(&self.inner, model, record_filter, args, trace_id.as_deref()).await }) .await } @@ -201,7 +208,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result> { catch(self.connection_info.clone(), async move { - let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id).await?; + let mut res = write::update_record(&self.inner, model, record_filter, args, trace_id.as_deref()).await?; Ok(res.pop()) }) .await @@ -214,7 +221,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - write::delete_records(&self.inner, model, record_filter, trace_id).await + write::delete_records(&self.inner, model, record_filter, trace_id.as_deref()).await }) .await } @@ -225,7 +232,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result { catch(self.connection_info.clone(), async move { - native_upsert(&self.inner, upsert, trace_id).await + native_upsert(&self.inner, upsert, trace_id.as_deref()).await }) .await } @@ -250,7 +257,7 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { trace_id: Option, ) -> connector::Result<()> { catch(self.connection_info.clone(), async move { - write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id).await + write::m2m_disconnect(&self.inner, field, parent_id, child_ids, trace_id.as_deref()).await }) .await } diff --git a/query-engine/connectors/sql-query-connector/src/ordering.rs b/query-engine/connectors/sql-query-connector/src/ordering.rs index c8b672414270..3d40f07b6ebb 100644 --- a/query-engine/connectors/sql-query-connector/src/ordering.rs +++ b/query-engine/connectors/sql-query-connector/src/ordering.rs @@ -8,7 +8,7 @@ static ORDER_JOIN_PREFIX: &str = "orderby_"; static ORDER_AGGREGATOR_ALIAS: &str = "orderby_aggregator"; #[derive(Debug, Clone)] -pub struct OrderByDefinition { +pub(crate) struct OrderByDefinition { /// Final column identifier to be used for the scalar field to order by pub(crate) order_column: Expression<'static>, /// Defines ordering for an `ORDER BY` statement. @@ -18,14 +18,14 @@ pub struct OrderByDefinition { } #[derive(Debug, Default)] -pub struct OrderByBuilder { +pub(crate) struct OrderByBuilder { // Used to generate unique join alias join_counter: usize, } impl OrderByBuilder { /// Builds all expressions for an `ORDER BY` clause based on the query arguments. - pub fn build(&mut self, query_arguments: &QueryArguments) -> Vec { + pub(crate) fn build(&mut self, query_arguments: &QueryArguments) -> Vec { let needs_reversed_order = query_arguments.needs_reversed_order(); // The index is used to differentiate potentially separate relations to the same model. diff --git a/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs b/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs index 45885dd52d51..b0319b28b22c 100644 --- a/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs +++ b/query-engine/connectors/sql-query-connector/src/query_arguments_ext.rs @@ -1,6 +1,6 @@ use connector_interface::QueryArguments; -pub trait QueryArgumentsExt { +pub(crate) trait QueryArgumentsExt { /// If we need to take rows before a cursor position, then we need to reverse the order in SQL. fn needs_reversed_order(&self) -> bool; } diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs b/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs index 2124294a2f26..0a305e476ef3 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/mod.rs @@ -1,8 +1,5 @@ -pub mod read; -pub mod write; - -pub use read::*; -pub use write::*; +pub(crate) mod read; +pub(crate) mod write; use crate::model_extensions::SelectionResultExt; use prisma_models::SelectionResult; diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/read.rs b/query-engine/connectors/sql-query-connector/src/query_builder/read.rs index 2a2fe48c93fb..a36513b0c61a 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/read.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/read.rs @@ -8,12 +8,12 @@ use prisma_models::*; use quaint::ast::*; use tracing::Span; -pub trait SelectDefinition { +pub(crate) trait SelectDefinition { fn into_select( self, _: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>); } @@ -22,7 +22,7 @@ impl SelectDefinition for Filter { self, model: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { let args = QueryArguments::from((model.clone(), self)); args.into_select(model, aggr_selections, trace_id) @@ -34,7 +34,7 @@ impl SelectDefinition for &Filter { self, model: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { self.clone().into_select(model, aggr_selections, trace_id) } @@ -45,7 +45,7 @@ impl SelectDefinition for Select<'static> { self, _: &ModelRef, _: &[RelAggregationSelection], - _trace_id: Option, + _trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { (self, vec![]) } @@ -56,7 +56,7 @@ impl SelectDefinition for QueryArguments { self, model: &ModelRef, aggr_selections: &[RelAggregationSelection], - trace_id: Option, + trace_id: Option<&str>, ) -> (Select<'static>, Vec>) { let order_by_definitions = OrderByBuilder::default().build(&self); let (table_opt, cursor_condition) = cursor_condition::build(&self, &model, &order_by_definitions); @@ -111,17 +111,17 @@ impl SelectDefinition for QueryArguments { } } -pub fn get_records( +pub(crate) fn get_records( model: &ModelRef, columns: impl Iterator>, aggr_selections: &[RelAggregationSelection], query: T, - trace_id: Option, + trace_id: Option<&str>, ) -> Select<'static> where T: SelectDefinition, { - let (select, additional_selection_set) = query.into_select(model, aggr_selections, trace_id.clone()); + let (select, additional_selection_set) = query.into_select(model, aggr_selections, trace_id); let select = columns.fold(select, |acc, col| acc.column(col)); let select = select.append_trace(&Span::current()).add_trace_id(trace_id); @@ -157,14 +157,14 @@ where /// ``` /// Important note: Do not use the AsColumn trait here as we need to construct column references that are relative, /// not absolute - e.g. `SELECT "field" FROM (...)` NOT `SELECT "full"."path"."to"."field" FROM (...)`. -pub fn aggregate( +pub(crate) fn aggregate( model: &ModelRef, selections: &[AggregationSelection], args: QueryArguments, - trace_id: Option, + trace_id: Option<&str>, ) -> Select<'static> { let columns = extract_columns(model, &selections); - let sub_query = get_records(model, columns.into_iter(), &[], args, trace_id.clone()); + let sub_query = get_records(model, columns.into_iter(), &[], args, trace_id); let sub_table = Table::from(sub_query).alias("sub"); selections.iter().fold( @@ -205,15 +205,15 @@ pub fn aggregate( ) } -pub fn group_by_aggregate( +pub(crate) fn group_by_aggregate( model: &ModelRef, args: QueryArguments, selections: &[AggregationSelection], group_by: Vec, having: Option, - trace_id: Option, + trace_id: Option<&str>, ) -> Select<'static> { - let (base_query, _) = args.into_select(model, &[], trace_id.clone()); + let (base_query, _) = args.into_select(model, &[], trace_id); let select_query = selections.iter().fold(base_query, |select, next_op| match next_op { AggregationSelection::Field(field) => select.column(field.as_column()), diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs index 882ec3502305..8bb3dbac06c3 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs @@ -1,5 +1,4 @@ -use crate::model_extensions::*; -use crate::sql_trace::SqlTraceComment; +use crate::{model_extensions::*, sql_trace::SqlTraceComment}; use connector_interface::{DatasourceFieldName, ScalarWriteOperation, WriteArgs}; use itertools::Itertools; use prisma_models::*; @@ -9,7 +8,7 @@ use tracing::Span; /// `INSERT` a new record to the database. Resulting an `INSERT` ast and an /// optional `RecordProjection` if available from the arguments or model. -pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option) -> Insert<'static> { +pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option<&str>) -> Insert<'static> { let fields: Vec<_> = model .fields() .scalar() @@ -39,12 +38,12 @@ pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Opt /// where each `WriteArg` in the Vec is one row. /// Requires `affected_fields` to be non-empty to produce valid SQL. #[allow(clippy::mutable_key_type)] -pub fn create_records_nonempty( +pub(crate) fn create_records_nonempty( model: &ModelRef, args: Vec, skip_duplicates: bool, affected_fields: &HashSet, - trace_id: Option, + trace_id: Option<&str>, ) -> Insert<'static> { // We need to bring all write args into a uniform shape. // The easiest way to do this is to take go over all fields of the batch and apply the following: @@ -87,7 +86,7 @@ pub fn create_records_nonempty( } /// `INSERT` empty records statement. -pub fn create_records_empty(model: &ModelRef, skip_duplicates: bool, trace_id: Option) -> Insert<'static> { +pub(crate) fn create_records_empty(model: &ModelRef, skip_duplicates: bool, trace_id: Option<&str>) -> Insert<'static> { let insert: Insert<'static> = Insert::single_into(model.as_table()).into(); let insert = insert.append_trace(&Span::current()).add_trace_id(trace_id); @@ -98,7 +97,7 @@ pub fn create_records_empty(model: &ModelRef, skip_duplicates: bool, trace_id: O } } -pub fn build_update_and_set_query(model: &ModelRef, args: WriteArgs, trace_id: Option) -> Update<'static> { +pub(crate) fn build_update_and_set_query(model: &ModelRef, args: WriteArgs, trace_id: Option<&str>) -> Update<'static> { let scalar_fields = model.fields().scalar(); let table = model.as_table(); let query = args @@ -153,7 +152,7 @@ pub fn build_update_and_set_query(model: &ModelRef, args: WriteArgs, trace_id: O query.append_trace(&Span::current()).add_trace_id(trace_id) } -pub fn chunk_update_with_ids( +pub(crate) fn chunk_update_with_ids( update: Update<'static>, model: &ModelRef, ids: &[&SelectionResult], @@ -168,11 +167,11 @@ pub fn chunk_update_with_ids( Ok(query) } -pub fn delete_many( +pub(crate) fn delete_many( model: &ModelRef, ids: &[&SelectionResult], filter_condition: ConditionTree<'static>, - trace_id: Option, + trace_id: Option<&str>, ) -> Vec> { let columns: Vec<_> = ModelProjection::from(model.primary_identifier()).as_columns().collect(); @@ -180,11 +179,11 @@ pub fn delete_many( Delete::from_table(model.as_table()) .so_that(conditions.and(filter_condition.clone())) .append_trace(&Span::current()) - .add_trace_id(trace_id.clone()) + .add_trace_id(trace_id) }) } -pub fn create_relation_table_records( +pub(crate) fn create_relation_table_records( field: &RelationFieldRef, parent_id: &SelectionResult, child_ids: &[SelectionResult], @@ -208,11 +207,11 @@ pub fn create_relation_table_records( insert.build().on_conflict(OnConflict::DoNothing).into() } -pub fn delete_relation_table_records( +pub(crate) fn delete_relation_table_records( parent_field: &RelationFieldRef, parent_id: &SelectionResult, child_ids: &[SelectionResult], - trace_id: Option, + trace_id: Option<&str>, ) -> Delete<'static> { let relation = parent_field.relation(); diff --git a/query-engine/connectors/sql-query-connector/src/query_ext.rs b/query-engine/connectors/sql-query-connector/src/query_ext.rs index a98481eca10b..7499e824b201 100644 --- a/query-engine/connectors/sql-query-connector/src/query_ext.rs +++ b/query-engine/connectors/sql-query-connector/src/query_ext.rs @@ -36,7 +36,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, q: Query<'_>, idents: &[ColumnMetadata<'_>], - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let span = info_span!("filter read query"); @@ -118,12 +118,7 @@ pub trait QueryExt: Queryable + Send + Sync { } /// Select one row from the database. - async fn find( - &self, - q: Select<'_>, - meta: &[ColumnMetadata<'_>], - trace_id: Option, - ) -> crate::Result { + async fn find(&self, q: Select<'_>, meta: &[ColumnMetadata<'_>], trace_id: Option<&str>) -> crate::Result { self.filter(q.limit(1).into(), meta, trace_id) .await? .into_iter() @@ -137,7 +132,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, model: &ModelRef, record_filter: RecordFilter, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { if let Some(selectors) = record_filter.selectors { Ok(selectors) @@ -151,7 +146,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, model: &ModelRef, filter: Filter, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let model_id: ModelProjection = model.primary_identifier().into(); let id_cols: Vec> = model_id.as_columns().collect(); @@ -159,7 +154,7 @@ pub trait QueryExt: Queryable + Send + Sync { let select = Select::from_table(model.as_table()) .columns(id_cols) .append_trace(&Span::current()) - .add_trace_id(trace_id.clone()) + .add_trace_id(trace_id) .so_that(filter.aliased_condition_from(None, false)); self.select_ids(select, model_id, trace_id).await @@ -169,7 +164,7 @@ pub trait QueryExt: Queryable + Send + Sync { &self, select: Select<'_>, model_id: ModelProjection, - trace_id: Option, + trace_id: Option<&str>, ) -> crate::Result> { let idents: Vec<_> = model_id .fields() diff --git a/query-engine/connectors/sql-query-connector/src/sql_trace.rs b/query-engine/connectors/sql-query-connector/src/sql_trace.rs index 29cab8198215..7f2ab235c35e 100644 --- a/query-engine/connectors/sql-query-connector/src/sql_trace.rs +++ b/query-engine/connectors/sql-query-connector/src/sql_trace.rs @@ -13,7 +13,7 @@ pub fn trace_parent_to_string(context: &SpanContext) -> String { pub trait SqlTraceComment: Sized { fn append_trace(self, span: &Span) -> Self; - fn add_trace_id(self, trace_id: Option) -> Self; + fn add_trace_id(self, trace_id: Option<&str>) -> Self; } macro_rules! sql_trace { @@ -31,7 +31,7 @@ macro_rules! sql_trace { } } // Temporary method to pass the traceid in an operation - fn add_trace_id(self, trace_id: Option) -> Self { + fn add_trace_id(self, trace_id: Option<&str>) -> Self { if let Some(traceparent) = trace_id { if should_sample(&traceparent) { self.comment(format!("traceparent={}", traceparent))