From c4b027cadfe1adfc6302f5c5825e3e3e36662f16 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 29 Jul 2024 16:32:36 +0200 Subject: [PATCH 01/12] feat(torii-graphql): event messages update & subscription --- .../torii/graphql/src/object/event_message.rs | 201 +++++++++++++----- 1 file changed, 144 insertions(+), 57 deletions(-) diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index b1b815537c..e093dfea68 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -8,18 +8,19 @@ use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use tokio_stream::StreamExt; use torii_core::simple_broker::SimpleBroker; -use torii_core::types::EventMessage; +use torii_core::types::Entity; use super::inputs::keys_input::keys_argument; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - EVENT_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, + DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE, + EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::object::{resolve_many, resolve_one}; use crate::query::{type_mapping_query, value_mapping_from_row}; use crate::types::TypeData; -use crate::utils::extract; +use crate::utils; #[derive(Debug)] pub struct EventMessageObject; @@ -65,40 +66,34 @@ impl ResolvableObject for EventMessageObject { } fn subscriptions(&self) -> Option> { - Some(vec![ - SubscriptionField::new( - "eventMessageUpdated", - TypeRef::named_nn(self.type_name()), - |ctx| { - SubscriptionFieldFuture::new(async move { - let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), - None => None, - }; - // if id is None, then subscribe to all entities - // if id is Some, then subscribe to only the entity with that id - Ok(SimpleBroker::::subscribe().filter_map( - move |entity: EventMessage| { - if id.is_none() || id == Some(entity.id.clone()) { - Some(Ok(Value::Object(EventMessageObject::value_mapping( - entity, - )))) - } else { - // id != entity.id , then don't send anything, still listening - None - } - }, - )) - }) - }, - ) - .argument(InputValue::new("id", TypeRef::named(TypeRef::ID))), - ]) + Some(vec![SubscriptionField::new( + "entityUpdated", + TypeRef::named_nn(self.type_name()), + |ctx| { + SubscriptionFieldFuture::new(async move { + let id = match ctx.args.get("id") { + Some(id) => Some(id.string()?.to_string()), + None => None, + }; + // if id is None, then subscribe to all entities + // if id is Some, then subscribe to only the entity with that id + Ok(SimpleBroker::::subscribe().filter_map(move |entity: Entity| { + if id.is_none() || id == Some(entity.id.clone()) { + Some(Ok(Value::Object(EventMessageObject::value_mapping(entity)))) + } else { + // id != entity.id , then don't send anything, still listening + None + } + })) + }) + }, + ) + .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))]) } } impl EventMessageObject { - pub fn value_mapping(entity: EventMessage) -> ValueMapping { + pub fn value_mapping(entity: Entity) -> ValueMapping { let keys: Vec<&str> = entity.keys.split('/').filter(|&k| !k.is_empty()).collect(); IndexMap::from([ (Name::new("id"), Value::from(entity.id)), @@ -106,11 +101,15 @@ impl EventMessageObject { (Name::new("eventId"), Value::from(entity.event_id)), ( Name::new("createdAt"), - Value::from(entity.created_at.format("%Y-%m-%d %H:%M:%S").to_string()), + Value::from(entity.created_at.format(DATETIME_FORMAT).to_string()), ), ( Name::new("updatedAt"), - Value::from(entity.updated_at.format("%Y-%m-%d %H:%M:%S").to_string()), + Value::from(entity.updated_at.format(DATETIME_FORMAT).to_string()), + ), + ( + Name::new("executedAt"), + Value::from(entity.executed_at.format(DATETIME_FORMAT).to_string()), ), ]) } @@ -123,13 +122,13 @@ fn model_union_field() -> Field { Value::Object(indexmap) => { let mut conn = ctx.data::>()?.acquire().await?; - let entity_id = extract::(indexmap, "id")?; + let entity_id = utils::extract::(indexmap, "id")?; // fetch name from the models table // using the model id (hashed model name) - let model_ids: Vec<(String, String)> = sqlx::query_as( - "SELECT id, name + let model_ids: Vec<(String, String, String)> = sqlx::query_as( + "SELECT id, namespace, name FROM models - WHERE id IN ( + WHERE id IN ( SELECT model_id FROM event_model WHERE entity_id = ? @@ -140,20 +139,29 @@ fn model_union_field() -> Field { .await?; let mut results: Vec> = Vec::new(); - for (id, name) in model_ids { - // the model id is used as the id for the model members + for (id, namespace, name) in model_ids { + // the model id in the model mmeebrs table is the hashed model name (id) let type_mapping = type_mapping_query(&mut conn, &id).await?; - // but the model data tables use the unhashed model name as the table name - let data = model_data_recursive_query( + // but the table name for the model data is the unhashed model name + let data: ValueMapping = match model_data_recursive_query( &mut conn, - vec![name.clone()], + vec![format!("{namespace}-{name}")], &entity_id, + &[], &type_mapping, + false, ) - .await?; + .await? + { + Value::Object(map) => map, + _ => unreachable!(), + }; - results.push(FieldValue::with_type(FieldValue::owned_any(data), name)); + results.push(FieldValue::with_type( + FieldValue::owned_any(data), + utils::type_name_from_names(&namespace, &name), + )) } Ok(Some(FieldValue::list(results))) @@ -170,25 +178,104 @@ pub async fn model_data_recursive_query( conn: &mut PoolConnection, path_array: Vec, entity_id: &str, + indexes: &[i64], type_mapping: &TypeMapping, -) -> sqlx::Result { + is_list: bool, +) -> sqlx::Result { // For nested types, we need to remove prefix in path array let namespace = format!("{}_", path_array[0]); let table_name = &path_array.join("$").replace(&namespace, ""); - let query = format!("SELECT * FROM {} WHERE event_message_id = '{}'", table_name, entity_id); - let row = sqlx::query(&query).fetch_one(conn.as_mut()).await?; - let mut value_mapping = value_mapping_from_row(&row, type_mapping, true)?; + let mut query = format!("SELECT * FROM [{}] WHERE event_message_id = '{}' ", table_name, entity_id); + for (column_idx, index) in indexes.iter().enumerate() { + query.push_str(&format!("AND idx_{} = {} ", column_idx, index)); + } + + let rows = sqlx::query(&query).fetch_all(conn.as_mut()).await?; + if rows.is_empty() { + return Ok(Value::List(vec![])); + } - for (field_name, type_data) in type_mapping { - if let TypeData::Nested((_, nested_mapping)) = type_data { - let mut nested_path = path_array.clone(); - nested_path.push(field_name.to_string()); + let value_mapping: Value; + let mut nested_value_mappings = Vec::new(); - let nested_values = - model_data_recursive_query(conn, nested_path, entity_id, nested_mapping).await?; + for (idx, row) in rows.iter().enumerate() { + let mut nested_value_mapping = value_mapping_from_row(row, type_mapping, true)?; - value_mapping.insert(Name::new(field_name), Value::Object(nested_values)); + for (field_name, type_data) in type_mapping { + if let TypeData::Nested((_, nested_mapping)) = type_data { + let mut nested_path = path_array.clone(); + nested_path.push(field_name.to_string()); + + let nested_values = model_data_recursive_query( + conn, + nested_path, + entity_id, + &if is_list { + let mut indexes = indexes.to_vec(); + indexes.push(idx as i64); + indexes + } else { + indexes.to_vec() + }, + nested_mapping, + false, + ) + .await?; + + nested_value_mapping.insert(Name::new(field_name), nested_values); + } else if let TypeData::List(inner) = type_data { + let mut nested_path = path_array.clone(); + nested_path.push(field_name.to_string()); + + let data = match model_data_recursive_query( + conn, + nested_path, + entity_id, + // this might need to be changed to support 2d+ arrays + &if is_list { + let mut indexes = indexes.to_vec(); + indexes.push(idx as i64); + indexes + } else { + indexes.to_vec() + }, + &IndexMap::from([(Name::new("data"), *inner.clone())]), + true, + ) + .await? + { + // map our list which uses a data field as a place holder + // for all elements to get the elemnt directly + Value::List(data) => data + .iter() + .map(|v| match v { + Value::Object(map) => map.get(&Name::new("data")).unwrap().clone(), + ty => unreachable!( + "Expected Value::Object for list \"data\" field, got {:?}", + ty + ), + }) + .collect(), + Value::Object(map) => map.get(&Name::new("data")).unwrap().clone(), + ty => { + unreachable!( + "Expected Value::List or Value::Object for list, got {:?}", + ty + ); + } + }; + + nested_value_mapping.insert(Name::new(field_name), data); + } } + + nested_value_mappings.push(Value::Object(nested_value_mapping)); + } + + if is_list { + value_mapping = Value::List(nested_value_mappings); + } else { + value_mapping = nested_value_mappings.pop().unwrap(); } Ok(value_mapping) From c8ff43d0d16292675f4066788056858a5a5607d6 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 29 Jul 2024 16:35:03 +0200 Subject: [PATCH 02/12] refactor: correct type mapping --- crates/torii/graphql/src/object/event_message.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index e093dfea68..e9ca83cb75 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -8,7 +8,7 @@ use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use tokio_stream::StreamExt; use torii_core::simple_broker::SimpleBroker; -use torii_core::types::Entity; +use torii_core::types::EventMessage; use super::inputs::keys_input::keys_argument; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; @@ -67,7 +67,7 @@ impl ResolvableObject for EventMessageObject { fn subscriptions(&self) -> Option> { Some(vec![SubscriptionField::new( - "entityUpdated", + "eventMessageUpdated", TypeRef::named_nn(self.type_name()), |ctx| { SubscriptionFieldFuture::new(async move { @@ -77,7 +77,7 @@ impl ResolvableObject for EventMessageObject { }; // if id is None, then subscribe to all entities // if id is Some, then subscribe to only the entity with that id - Ok(SimpleBroker::::subscribe().filter_map(move |entity: Entity| { + Ok(SimpleBroker::::subscribe().filter_map(move |entity: EventMessage| { if id.is_none() || id == Some(entity.id.clone()) { Some(Ok(Value::Object(EventMessageObject::value_mapping(entity)))) } else { @@ -93,7 +93,7 @@ impl ResolvableObject for EventMessageObject { } impl EventMessageObject { - pub fn value_mapping(entity: Entity) -> ValueMapping { + pub fn value_mapping(entity: EventMessage) -> ValueMapping { let keys: Vec<&str> = entity.keys.split('/').filter(|&k| !k.is_empty()).collect(); IndexMap::from([ (Name::new("id"), Value::from(entity.id)), From ac1671586cdfb9801716a87344b24a08d0029a60 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 29 Jul 2024 19:10:51 +0200 Subject: [PATCH 03/12] feat: event message field to modeldata --- crates/torii/graphql/src/object/model_data.rs | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/crates/torii/graphql/src/object/model_data.rs b/crates/torii/graphql/src/object/model_data.rs index f51541a9d0..53839d5066 100644 --- a/crates/torii/graphql/src/object/model_data.rs +++ b/crates/torii/graphql/src/object/model_data.rs @@ -10,7 +10,8 @@ use super::inputs::where_input::{parse_where_argument, where_argument, WhereInpu use super::inputs::InputObjectTrait; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - ENTITY_ID_COLUMN, ENTITY_TABLE, EVENT_ID_COLUMN, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, + ENTITY_ID_COLUMN, ENTITY_TABLE, EVENT_ID_COLUMN, EVENT_MESSAGE_TABLE, ID_COLUMN, + INTERNAL_ENTITY_ID_KEY, }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row}; @@ -77,6 +78,7 @@ impl BasicObject for ModelDataObject { // root object requires entity_field association let mut root = objects.pop().unwrap(); root = root.field(entity_field()); + root = root.field(event_message_field()); objects.push(root); objects @@ -262,3 +264,23 @@ fn entity_field() -> Field { }) }) } + +fn event_message_field() -> Field { + Field::new("eventMessage", TypeRef::named("EventMessage"), |ctx| { + FieldFuture::new(async move { + match ctx.parent_value.try_to_value()? { + Value::Object(indexmap) => { + let mut conn = ctx.data::>()?.acquire().await?; + let entity_id = utils::extract::(indexmap, INTERNAL_ENTITY_ID_KEY)?; + let data = + fetch_single_row(&mut conn, EVENT_MESSAGE_TABLE, ID_COLUMN, &entity_id) + .await?; + let event_message = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?; + + Ok(Some(Value::Object(event_message))) + } + _ => Err("incorrect value, requires Value::Object".into()), + } + }) + }) +} From 9648106b29291f577caf28336d9f5d352b6a9225 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 29 Jul 2024 19:14:03 +0200 Subject: [PATCH 04/12] refactor: use constant for type names --- crates/torii/graphql/src/object/model_data.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/torii/graphql/src/object/model_data.rs b/crates/torii/graphql/src/object/model_data.rs index 53839d5066..ca54ee5584 100644 --- a/crates/torii/graphql/src/object/model_data.rs +++ b/crates/torii/graphql/src/object/model_data.rs @@ -10,8 +10,8 @@ use super::inputs::where_input::{parse_where_argument, where_argument, WhereInpu use super::inputs::InputObjectTrait; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - ENTITY_ID_COLUMN, ENTITY_TABLE, EVENT_ID_COLUMN, EVENT_MESSAGE_TABLE, ID_COLUMN, - INTERNAL_ENTITY_ID_KEY, + ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_TABLE, + EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row}; @@ -247,7 +247,7 @@ pub fn object(type_name: &str, type_mapping: &TypeMapping, path_array: Vec Field { - Field::new("entity", TypeRef::named("World__Entity"), |ctx| { + Field::new("entity", TypeRef::named(ENTITY_TYPE_NAME), |ctx| { FieldFuture::new(async move { match ctx.parent_value.try_to_value()? { Value::Object(indexmap) => { @@ -266,7 +266,7 @@ fn entity_field() -> Field { } fn event_message_field() -> Field { - Field::new("eventMessage", TypeRef::named("EventMessage"), |ctx| { + Field::new("eventMessage", TypeRef::named(EVENT_MESSAGE_TYPE_NAME), |ctx| { FieldFuture::new(async move { match ctx.parent_value.try_to_value()? { Value::Object(indexmap) => { From 5096673023292c0eb9ecf074d5d4ef58a8bf0ec7 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 30 Jul 2024 13:09:37 -0400 Subject: [PATCH 05/12] fmt --- .../torii/graphql/src/object/event_message.rs | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index e9ca83cb75..82cc6e8e64 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -66,29 +66,35 @@ impl ResolvableObject for EventMessageObject { } fn subscriptions(&self) -> Option> { - Some(vec![SubscriptionField::new( - "eventMessageUpdated", - TypeRef::named_nn(self.type_name()), - |ctx| { - SubscriptionFieldFuture::new(async move { - let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), - None => None, - }; - // if id is None, then subscribe to all entities - // if id is Some, then subscribe to only the entity with that id - Ok(SimpleBroker::::subscribe().filter_map(move |entity: EventMessage| { - if id.is_none() || id == Some(entity.id.clone()) { - Some(Ok(Value::Object(EventMessageObject::value_mapping(entity)))) - } else { - // id != entity.id , then don't send anything, still listening - None - } - })) - }) - }, - ) - .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))]) + Some(vec![ + SubscriptionField::new( + "eventMessageUpdated", + TypeRef::named_nn(self.type_name()), + |ctx| { + SubscriptionFieldFuture::new(async move { + let id = match ctx.args.get("id") { + Some(id) => Some(id.string()?.to_string()), + None => None, + }; + // if id is None, then subscribe to all entities + // if id is Some, then subscribe to only the entity with that id + Ok(SimpleBroker::::subscribe().filter_map( + move |entity: EventMessage| { + if id.is_none() || id == Some(entity.id.clone()) { + Some(Ok(Value::Object(EventMessageObject::value_mapping( + entity, + )))) + } else { + // id != entity.id , then don't send anything, still listening + None + } + }, + )) + }) + }, + ) + .argument(InputValue::new("id", TypeRef::named(TypeRef::ID))), + ]) } } @@ -185,7 +191,8 @@ pub async fn model_data_recursive_query( // For nested types, we need to remove prefix in path array let namespace = format!("{}_", path_array[0]); let table_name = &path_array.join("$").replace(&namespace, ""); - let mut query = format!("SELECT * FROM [{}] WHERE event_message_id = '{}' ", table_name, entity_id); + let mut query = + format!("SELECT * FROM [{}] WHERE event_message_id = '{}' ", table_name, entity_id); for (column_idx, index) in indexes.iter().enumerate() { query.push_str(&format!("AND idx_{} = {} ", column_idx, index)); } From e903e3d47f3ee9761dcc86cffa7a514a136938dd Mon Sep 17 00:00:00 2001 From: Nasr Date: Sun, 4 Aug 2024 20:40:36 -0400 Subject: [PATCH 06/12] refactor: share logic between event messages and entities --- crates/torii/graphql/src/constants.rs | 1 + crates/torii/graphql/src/object/entity.rs | 11 +- .../torii/graphql/src/object/event_message.rs | 176 +++--------------- crates/torii/graphql/src/object/mod.rs | 3 +- crates/torii/graphql/src/object/model_data.rs | 20 +- crates/torii/graphql/src/query/mod.rs | 5 +- 6 files changed, 58 insertions(+), 158 deletions(-) diff --git a/crates/torii/graphql/src/constants.rs b/crates/torii/graphql/src/constants.rs index a01031e5e2..e09c8de6d2 100644 --- a/crates/torii/graphql/src/constants.rs +++ b/crates/torii/graphql/src/constants.rs @@ -13,6 +13,7 @@ pub const METADATA_TABLE: &str = "metadata"; pub const ID_COLUMN: &str = "id"; pub const EVENT_ID_COLUMN: &str = "event_id"; pub const ENTITY_ID_COLUMN: &str = "entity_id"; +pub const EVENT_MESSAGE_ID_COLUMN: &str = "event_message_id"; pub const JSON_COLUMN: &str = "json"; pub const TRANSACTION_HASH_COLUMN: &str = "transaction_hash"; diff --git a/crates/torii/graphql/src/object/entity.rs b/crates/torii/graphql/src/object/entity.rs index 92737ba9c8..85c9463f91 100644 --- a/crates/torii/graphql/src/object/entity.rs +++ b/crates/torii/graphql/src/object/entity.rs @@ -13,7 +13,7 @@ use torii_core::types::Entity; use super::inputs::keys_input::keys_argument; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - DATETIME_FORMAT, ENTITY_NAMES, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, ID_COLUMN, + DATETIME_FORMAT, ENTITY_ID_COLUMN, ENTITY_NAMES, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, ID_COLUMN }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::object::{resolve_many, resolve_one}; @@ -143,6 +143,7 @@ fn model_union_field() -> Field { // but the table name for the model data is the unhashed model name let data: ValueMapping = match model_data_recursive_query( &mut conn, + ENTITY_ID_COLUMN, vec![format!("{namespace}-{name}")], &entity_id, &[], @@ -173,6 +174,7 @@ fn model_union_field() -> Field { #[async_recursion] pub async fn model_data_recursive_query( conn: &mut PoolConnection, + entity_id_column: &str, path_array: Vec, entity_id: &str, indexes: &[i64], @@ -182,7 +184,8 @@ pub async fn model_data_recursive_query( // For nested types, we need to remove prefix in path array let namespace = format!("{}_", path_array[0]); let table_name = &path_array.join("$").replace(&namespace, ""); - let mut query = format!("SELECT * FROM [{}] WHERE entity_id = '{}' ", table_name, entity_id); + let mut query = + format!("SELECT * FROM [{}] WHERE {entity_id_column} = '{}' ", table_name, entity_id); for (column_idx, index) in indexes.iter().enumerate() { query.push_str(&format!("AND idx_{} = {} ", column_idx, index)); } @@ -196,7 +199,7 @@ pub async fn model_data_recursive_query( let mut nested_value_mappings = Vec::new(); for (idx, row) in rows.iter().enumerate() { - let mut nested_value_mapping = value_mapping_from_row(row, type_mapping, true)?; + let mut nested_value_mapping = value_mapping_from_row(row, entity_id_column, type_mapping, true)?; for (field_name, type_data) in type_mapping { if let TypeData::Nested((_, nested_mapping)) = type_data { @@ -205,6 +208,7 @@ pub async fn model_data_recursive_query( let nested_values = model_data_recursive_query( conn, + entity_id_column, nested_path, entity_id, &if is_list { @@ -226,6 +230,7 @@ pub async fn model_data_recursive_query( let data = match model_data_recursive_query( conn, + entity_id_column, nested_path, entity_id, // this might need to be changed to support 2d+ arrays diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index 82cc6e8e64..bd18358ae1 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -3,23 +3,18 @@ use async_graphql::dynamic::{ Field, FieldFuture, FieldValue, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef, }; use async_graphql::{Name, Value}; -use async_recursion::async_recursion; -use sqlx::pool::PoolConnection; -use sqlx::{Pool, Sqlite}; -use tokio_stream::StreamExt; use torii_core::simple_broker::SimpleBroker; use torii_core::types::EventMessage; +use super::entity::model_data_recursive_query; use super::inputs::keys_input::keys_argument; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE, - EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, + DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::object::{resolve_many, resolve_one}; -use crate::query::{type_mapping_query, value_mapping_from_row}; -use crate::types::TypeData; +use crate::query::type_mapping_query; use crate::utils; #[derive(Debug)] @@ -66,35 +61,31 @@ impl ResolvableObject for EventMessageObject { } fn subscriptions(&self) -> Option> { - Some(vec![ - SubscriptionField::new( - "eventMessageUpdated", - TypeRef::named_nn(self.type_name()), - |ctx| { - SubscriptionFieldFuture::new(async move { - let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), - None => None, - }; - // if id is None, then subscribe to all entities - // if id is Some, then subscribe to only the entity with that id - Ok(SimpleBroker::::subscribe().filter_map( - move |entity: EventMessage| { - if id.is_none() || id == Some(entity.id.clone()) { - Some(Ok(Value::Object(EventMessageObject::value_mapping( - entity, - )))) - } else { - // id != entity.id , then don't send anything, still listening - None - } - }, - )) - }) - }, - ) - .argument(InputValue::new("id", TypeRef::named(TypeRef::ID))), - ]) + Some(vec![SubscriptionField::new( + "eventMessageUpdated", + TypeRef::named_nn(self.type_name()), + |ctx| { + SubscriptionFieldFuture::new(async move { + let id = match ctx.args.get("id") { + Some(id) => Some(id.string()?.to_string()), + None => None, + }; + // if id is None, then subscribe to all entities + // if id is Some, then subscribe to only the entity with that id + Ok(SimpleBroker::::subscribe().filter_map( + move |entity: EventMessage| { + if id.is_none() || id == Some(entity.id.clone()) { + Some(Ok(Value::Object(EventMessageObject::value_mapping(entity)))) + } else { + // id != entity.id , then don't send anything, still listening + None + } + }, + )) + }) + }, + ) + .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))]) } } @@ -152,6 +143,7 @@ fn model_union_field() -> Field { // but the table name for the model data is the unhashed model name let data: ValueMapping = match model_data_recursive_query( &mut conn, + EVENT_MESSAGE_ID_COLUMN, vec![format!("{namespace}-{name}")], &entity_id, &[], @@ -177,113 +169,3 @@ fn model_union_field() -> Field { }) }) } - -// TODO: flatten query -#[async_recursion] -pub async fn model_data_recursive_query( - conn: &mut PoolConnection, - path_array: Vec, - entity_id: &str, - indexes: &[i64], - type_mapping: &TypeMapping, - is_list: bool, -) -> sqlx::Result { - // For nested types, we need to remove prefix in path array - let namespace = format!("{}_", path_array[0]); - let table_name = &path_array.join("$").replace(&namespace, ""); - let mut query = - format!("SELECT * FROM [{}] WHERE event_message_id = '{}' ", table_name, entity_id); - for (column_idx, index) in indexes.iter().enumerate() { - query.push_str(&format!("AND idx_{} = {} ", column_idx, index)); - } - - let rows = sqlx::query(&query).fetch_all(conn.as_mut()).await?; - if rows.is_empty() { - return Ok(Value::List(vec![])); - } - - let value_mapping: Value; - let mut nested_value_mappings = Vec::new(); - - for (idx, row) in rows.iter().enumerate() { - let mut nested_value_mapping = value_mapping_from_row(row, type_mapping, true)?; - - for (field_name, type_data) in type_mapping { - if let TypeData::Nested((_, nested_mapping)) = type_data { - let mut nested_path = path_array.clone(); - nested_path.push(field_name.to_string()); - - let nested_values = model_data_recursive_query( - conn, - nested_path, - entity_id, - &if is_list { - let mut indexes = indexes.to_vec(); - indexes.push(idx as i64); - indexes - } else { - indexes.to_vec() - }, - nested_mapping, - false, - ) - .await?; - - nested_value_mapping.insert(Name::new(field_name), nested_values); - } else if let TypeData::List(inner) = type_data { - let mut nested_path = path_array.clone(); - nested_path.push(field_name.to_string()); - - let data = match model_data_recursive_query( - conn, - nested_path, - entity_id, - // this might need to be changed to support 2d+ arrays - &if is_list { - let mut indexes = indexes.to_vec(); - indexes.push(idx as i64); - indexes - } else { - indexes.to_vec() - }, - &IndexMap::from([(Name::new("data"), *inner.clone())]), - true, - ) - .await? - { - // map our list which uses a data field as a place holder - // for all elements to get the elemnt directly - Value::List(data) => data - .iter() - .map(|v| match v { - Value::Object(map) => map.get(&Name::new("data")).unwrap().clone(), - ty => unreachable!( - "Expected Value::Object for list \"data\" field, got {:?}", - ty - ), - }) - .collect(), - Value::Object(map) => map.get(&Name::new("data")).unwrap().clone(), - ty => { - unreachable!( - "Expected Value::List or Value::Object for list, got {:?}", - ty - ); - } - }; - - nested_value_mapping.insert(Name::new(field_name), data); - } - } - - nested_value_mappings.push(Value::Object(nested_value_mapping)); - } - - if is_list { - value_mapping = Value::List(nested_value_mappings); - } else { - value_mapping = nested_value_mappings.pop().unwrap(); - } - - Ok(value_mapping) -} diff --git a/crates/torii/graphql/src/object/mod.rs b/crates/torii/graphql/src/object/mod.rs index c1046ffbe4..4deb794999 100644 --- a/crates/torii/graphql/src/object/mod.rs +++ b/crates/torii/graphql/src/object/mod.rs @@ -21,6 +21,7 @@ use self::connection::{ }; use self::inputs::keys_input::parse_keys_argument; use self::inputs::order_input::parse_order_argument; +use crate::constants::ENTITY_ID_COLUMN; use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row}; use crate::query::value_mapping_from_row; use crate::types::{TypeMapping, ValueMapping}; @@ -135,7 +136,7 @@ pub fn resolve_one( let id: String = extract::(ctx.args.as_index_map(), &id_column.to_case(Case::Camel))?; let data = fetch_single_row(&mut conn, &table_name, &id_column, &id).await?; - let model = value_mapping_from_row(&data, &type_mapping, false)?; + let model = value_mapping_from_row(&data, ENTITY_ID_COLUMN, &type_mapping, false)?; Ok(Some(Value::Object(model))) }) }) diff --git a/crates/torii/graphql/src/object/model_data.rs b/crates/torii/graphql/src/object/model_data.rs index ca54ee5584..3f0c5cdce8 100644 --- a/crates/torii/graphql/src/object/model_data.rs +++ b/crates/torii/graphql/src/object/model_data.rs @@ -10,8 +10,8 @@ use super::inputs::where_input::{parse_where_argument, where_argument, WhereInpu use super::inputs::InputObjectTrait; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_TABLE, - EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, + ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, + EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row}; @@ -213,7 +213,7 @@ pub fn object(type_name: &str, type_mapping: &TypeMapping, path_array: Vec Field { let entity_id = utils::extract::(indexmap, INTERNAL_ENTITY_ID_KEY)?; let data = fetch_single_row(&mut conn, ENTITY_TABLE, ID_COLUMN, &entity_id).await?; - let entity = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?; + let entity = value_mapping_from_row( + &data, + ENTITY_ID_COLUMN, + &ENTITY_TYPE_MAPPING, + false, + )?; Ok(Some(Value::Object(entity))) } @@ -275,7 +280,12 @@ fn event_message_field() -> Field { let data = fetch_single_row(&mut conn, EVENT_MESSAGE_TABLE, ID_COLUMN, &entity_id) .await?; - let event_message = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?; + let event_message = value_mapping_from_row( + &data, + EVENT_MESSAGE_ID_COLUMN, + &ENTITY_TYPE_MAPPING, + false, + )?; Ok(Some(Value::Object(event_message))) } diff --git a/crates/torii/graphql/src/query/mod.rs b/crates/torii/graphql/src/query/mod.rs index 46a1558517..b8bd65d830 100644 --- a/crates/torii/graphql/src/query/mod.rs +++ b/crates/torii/graphql/src/query/mod.rs @@ -10,7 +10,7 @@ use sqlx::sqlite::SqliteRow; use sqlx::{Row, SqliteConnection}; use torii_core::sql::FELT_DELIMITER; -use crate::constants::{BOOLEAN_TRUE, ENTITY_ID_COLUMN, INTERNAL_ENTITY_ID_KEY}; +use crate::constants::{BOOLEAN_TRUE, INTERNAL_ENTITY_ID_KEY}; use crate::object::model_data::ModelMember; use crate::types::{TypeData, TypeMapping, ValueMapping}; @@ -167,6 +167,7 @@ fn remove_hex_leading_zeros(value: Value) -> Value { pub fn value_mapping_from_row( row: &SqliteRow, + entity_id_column: &str, types: &TypeMapping, is_external: bool, ) -> sqlx::Result { @@ -193,7 +194,7 @@ pub fn value_mapping_from_row( .collect::>()?; // entity_id is not part of a model's type_mapping but needed to relate to parent entity - if let Ok(entity_id) = row.try_get::(ENTITY_ID_COLUMN) { + if let Ok(entity_id) = row.try_get::(entity_id_column) { value_mapping.insert(Name::new(INTERNAL_ENTITY_ID_KEY), Value::from(entity_id)); } From b7d54fa3280851807fa53bb2a31ca2ab37e036e2 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 12 Aug 2024 20:36:27 -0400 Subject: [PATCH 07/12] fmt --- .../graphql/src/object/connection/mod.rs | 2 +- crates/torii/graphql/src/object/entity.rs | 6 +- .../torii/graphql/src/object/event_message.rs | 59 +++++++++++-------- .../torii/graphql/src/object/metadata/mod.rs | 2 +- crates/torii/graphql/src/object/model_data.rs | 7 ++- 5 files changed, 45 insertions(+), 31 deletions(-) diff --git a/crates/torii/graphql/src/object/connection/mod.rs b/crates/torii/graphql/src/object/connection/mod.rs index dc9931ac58..289810d0e2 100644 --- a/crates/torii/graphql/src/object/connection/mod.rs +++ b/crates/torii/graphql/src/object/connection/mod.rs @@ -136,7 +136,7 @@ pub fn connection_output( let primary_order = row.try_get::(id_column)?; let secondary_order = row.try_get_unchecked::(&order_field)?; let cursor = cursor::encode(&primary_order, &secondary_order); - let value_mapping = value_mapping_from_row(row, types, is_external)?; + let value_mapping = value_mapping_from_row(row, id_column, types, is_external)?; let mut edge = ValueMapping::new(); edge.insert(Name::new("node"), Value::Object(value_mapping)); diff --git a/crates/torii/graphql/src/object/entity.rs b/crates/torii/graphql/src/object/entity.rs index 85c9463f91..5f9be1be65 100644 --- a/crates/torii/graphql/src/object/entity.rs +++ b/crates/torii/graphql/src/object/entity.rs @@ -13,7 +13,8 @@ use torii_core::types::Entity; use super::inputs::keys_input::keys_argument; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - DATETIME_FORMAT, ENTITY_ID_COLUMN, ENTITY_NAMES, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, ID_COLUMN + DATETIME_FORMAT, ENTITY_ID_COLUMN, ENTITY_NAMES, ENTITY_TABLE, ENTITY_TYPE_NAME, + EVENT_ID_COLUMN, ID_COLUMN, }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::object::{resolve_many, resolve_one}; @@ -199,7 +200,8 @@ pub async fn model_data_recursive_query( let mut nested_value_mappings = Vec::new(); for (idx, row) in rows.iter().enumerate() { - let mut nested_value_mapping = value_mapping_from_row(row, entity_id_column, type_mapping, true)?; + let mut nested_value_mapping = + value_mapping_from_row(row, entity_id_column, type_mapping, true)?; for (field_name, type_data) in type_mapping { if let TypeData::Nested((_, nested_mapping)) = type_data { diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index bd18358ae1..0e2cec609e 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -3,6 +3,8 @@ use async_graphql::dynamic::{ Field, FieldFuture, FieldValue, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef, }; use async_graphql::{Name, Value}; +use sqlx::{Pool, Sqlite}; +use tokio_stream::StreamExt; use torii_core::simple_broker::SimpleBroker; use torii_core::types::EventMessage; @@ -10,7 +12,8 @@ use super::entity::model_data_recursive_query; use super::inputs::keys_input::keys_argument; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN + DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, EVENT_MESSAGE_NAMES, + EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::object::{resolve_many, resolve_one}; @@ -61,31 +64,35 @@ impl ResolvableObject for EventMessageObject { } fn subscriptions(&self) -> Option> { - Some(vec![SubscriptionField::new( - "eventMessageUpdated", - TypeRef::named_nn(self.type_name()), - |ctx| { - SubscriptionFieldFuture::new(async move { - let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), - None => None, - }; - // if id is None, then subscribe to all entities - // if id is Some, then subscribe to only the entity with that id - Ok(SimpleBroker::::subscribe().filter_map( - move |entity: EventMessage| { - if id.is_none() || id == Some(entity.id.clone()) { - Some(Ok(Value::Object(EventMessageObject::value_mapping(entity)))) - } else { - // id != entity.id , then don't send anything, still listening - None - } - }, - )) - }) - }, - ) - .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))]) + Some(vec![ + SubscriptionField::new( + "eventMessageUpdated", + TypeRef::named_nn(self.type_name()), + |ctx| { + SubscriptionFieldFuture::new(async move { + let id = match ctx.args.get("id") { + Some(id) => Some(id.string()?.to_string()), + None => None, + }; + // if id is None, then subscribe to all entities + // if id is Some, then subscribe to only the entity with that id + Ok(SimpleBroker::::subscribe().filter_map( + move |entity: EventMessage| { + if id.is_none() || id == Some(entity.id.clone()) { + Some(Ok(Value::Object(EventMessageObject::value_mapping( + entity, + )))) + } else { + // id != entity.id , then don't send anything, still listening + None + } + }, + )) + }) + }, + ) + .argument(InputValue::new("id", TypeRef::named(TypeRef::ID))), + ]) } } diff --git a/crates/torii/graphql/src/object/metadata/mod.rs b/crates/torii/graphql/src/object/metadata/mod.rs index 07a7771380..70b00b234f 100644 --- a/crates/torii/graphql/src/object/metadata/mod.rs +++ b/crates/torii/graphql/src/object/metadata/mod.rs @@ -105,7 +105,7 @@ fn metadata_connection_output( .map(|row| { let order = row.try_get::(ID_COLUMN)?; let cursor = cursor::encode(&order, &order); - let mut value_mapping = value_mapping_from_row(row, row_types, false)?; + let mut value_mapping = value_mapping_from_row(row, "entity_id", row_types, false)?; value_mapping.insert(Name::new("worldAddress"), Value::from(world_address)); let json_str = row.try_get::(JSON_COLUMN)?; diff --git a/crates/torii/graphql/src/object/model_data.rs b/crates/torii/graphql/src/object/model_data.rs index 3f0c5cdce8..228a12bf95 100644 --- a/crates/torii/graphql/src/object/model_data.rs +++ b/crates/torii/graphql/src/object/model_data.rs @@ -213,7 +213,12 @@ pub fn object(type_name: &str, type_mapping: &TypeMapping, path_array: Vec Date: Mon, 12 Aug 2024 20:54:13 -0400 Subject: [PATCH 08/12] chore: remove executedAt for event message --- crates/torii/graphql/src/object/event_message.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index 0e2cec609e..ee85a2e8f1 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -111,10 +111,6 @@ impl EventMessageObject { Name::new("updatedAt"), Value::from(entity.updated_at.format(DATETIME_FORMAT).to_string()), ), - ( - Name::new("executedAt"), - Value::from(entity.executed_at.format(DATETIME_FORMAT).to_string()), - ), ]) } } From 5884f8bc6ef3a2d2079f93e356b478c6e45b91cb Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 12 Aug 2024 21:01:12 -0400 Subject: [PATCH 09/12] refactor; more miantaible value mapping --- .../graphql/src/object/connection/mod.rs | 2 +- crates/torii/graphql/src/object/entity.rs | 3 +-- .../torii/graphql/src/object/metadata/mod.rs | 2 +- crates/torii/graphql/src/object/mod.rs | 3 +-- crates/torii/graphql/src/object/model_data.rs | 25 ++++--------------- crates/torii/graphql/src/query/mod.rs | 9 ++++--- 6 files changed, 15 insertions(+), 29 deletions(-) diff --git a/crates/torii/graphql/src/object/connection/mod.rs b/crates/torii/graphql/src/object/connection/mod.rs index 289810d0e2..dc9931ac58 100644 --- a/crates/torii/graphql/src/object/connection/mod.rs +++ b/crates/torii/graphql/src/object/connection/mod.rs @@ -136,7 +136,7 @@ pub fn connection_output( let primary_order = row.try_get::(id_column)?; let secondary_order = row.try_get_unchecked::(&order_field)?; let cursor = cursor::encode(&primary_order, &secondary_order); - let value_mapping = value_mapping_from_row(row, id_column, types, is_external)?; + let value_mapping = value_mapping_from_row(row, types, is_external)?; let mut edge = ValueMapping::new(); edge.insert(Name::new("node"), Value::Object(value_mapping)); diff --git a/crates/torii/graphql/src/object/entity.rs b/crates/torii/graphql/src/object/entity.rs index 5f9be1be65..e346e9f375 100644 --- a/crates/torii/graphql/src/object/entity.rs +++ b/crates/torii/graphql/src/object/entity.rs @@ -200,8 +200,7 @@ pub async fn model_data_recursive_query( let mut nested_value_mappings = Vec::new(); for (idx, row) in rows.iter().enumerate() { - let mut nested_value_mapping = - value_mapping_from_row(row, entity_id_column, type_mapping, true)?; + let mut nested_value_mapping = value_mapping_from_row(row, type_mapping, true)?; for (field_name, type_data) in type_mapping { if let TypeData::Nested((_, nested_mapping)) = type_data { diff --git a/crates/torii/graphql/src/object/metadata/mod.rs b/crates/torii/graphql/src/object/metadata/mod.rs index 70b00b234f..07a7771380 100644 --- a/crates/torii/graphql/src/object/metadata/mod.rs +++ b/crates/torii/graphql/src/object/metadata/mod.rs @@ -105,7 +105,7 @@ fn metadata_connection_output( .map(|row| { let order = row.try_get::(ID_COLUMN)?; let cursor = cursor::encode(&order, &order); - let mut value_mapping = value_mapping_from_row(row, "entity_id", row_types, false)?; + let mut value_mapping = value_mapping_from_row(row, row_types, false)?; value_mapping.insert(Name::new("worldAddress"), Value::from(world_address)); let json_str = row.try_get::(JSON_COLUMN)?; diff --git a/crates/torii/graphql/src/object/mod.rs b/crates/torii/graphql/src/object/mod.rs index 4deb794999..c1046ffbe4 100644 --- a/crates/torii/graphql/src/object/mod.rs +++ b/crates/torii/graphql/src/object/mod.rs @@ -21,7 +21,6 @@ use self::connection::{ }; use self::inputs::keys_input::parse_keys_argument; use self::inputs::order_input::parse_order_argument; -use crate::constants::ENTITY_ID_COLUMN; use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row}; use crate::query::value_mapping_from_row; use crate::types::{TypeMapping, ValueMapping}; @@ -136,7 +135,7 @@ pub fn resolve_one( let id: String = extract::(ctx.args.as_index_map(), &id_column.to_case(Case::Camel))?; let data = fetch_single_row(&mut conn, &table_name, &id_column, &id).await?; - let model = value_mapping_from_row(&data, ENTITY_ID_COLUMN, &type_mapping, false)?; + let model = value_mapping_from_row(&data, &type_mapping, false)?; Ok(Some(Value::Object(model))) }) }) diff --git a/crates/torii/graphql/src/object/model_data.rs b/crates/torii/graphql/src/object/model_data.rs index 228a12bf95..ca54ee5584 100644 --- a/crates/torii/graphql/src/object/model_data.rs +++ b/crates/torii/graphql/src/object/model_data.rs @@ -10,8 +10,8 @@ use super::inputs::where_input::{parse_where_argument, where_argument, WhereInpu use super::inputs::InputObjectTrait; use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; use crate::constants::{ - ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, - EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, + ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_TABLE, + EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, }; use crate::mapping::ENTITY_TYPE_MAPPING; use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row}; @@ -213,12 +213,7 @@ pub fn object(type_name: &str, type_mapping: &TypeMapping, path_array: Vec Field { let entity_id = utils::extract::(indexmap, INTERNAL_ENTITY_ID_KEY)?; let data = fetch_single_row(&mut conn, ENTITY_TABLE, ID_COLUMN, &entity_id).await?; - let entity = value_mapping_from_row( - &data, - ENTITY_ID_COLUMN, - &ENTITY_TYPE_MAPPING, - false, - )?; + let entity = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?; Ok(Some(Value::Object(entity))) } @@ -285,12 +275,7 @@ fn event_message_field() -> Field { let data = fetch_single_row(&mut conn, EVENT_MESSAGE_TABLE, ID_COLUMN, &entity_id) .await?; - let event_message = value_mapping_from_row( - &data, - EVENT_MESSAGE_ID_COLUMN, - &ENTITY_TYPE_MAPPING, - false, - )?; + let event_message = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?; Ok(Some(Value::Object(event_message))) } diff --git a/crates/torii/graphql/src/query/mod.rs b/crates/torii/graphql/src/query/mod.rs index b8bd65d830..caa6c5e948 100644 --- a/crates/torii/graphql/src/query/mod.rs +++ b/crates/torii/graphql/src/query/mod.rs @@ -10,7 +10,9 @@ use sqlx::sqlite::SqliteRow; use sqlx::{Row, SqliteConnection}; use torii_core::sql::FELT_DELIMITER; -use crate::constants::{BOOLEAN_TRUE, INTERNAL_ENTITY_ID_KEY}; +use crate::constants::{ + BOOLEAN_TRUE, ENTITY_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, INTERNAL_ENTITY_ID_KEY, +}; use crate::object::model_data::ModelMember; use crate::types::{TypeData, TypeMapping, ValueMapping}; @@ -167,7 +169,6 @@ fn remove_hex_leading_zeros(value: Value) -> Value { pub fn value_mapping_from_row( row: &SqliteRow, - entity_id_column: &str, types: &TypeMapping, is_external: bool, ) -> sqlx::Result { @@ -194,8 +195,10 @@ pub fn value_mapping_from_row( .collect::>()?; // entity_id is not part of a model's type_mapping but needed to relate to parent entity - if let Ok(entity_id) = row.try_get::(entity_id_column) { + if let Ok(entity_id) = row.try_get::(ENTITY_ID_COLUMN) { value_mapping.insert(Name::new(INTERNAL_ENTITY_ID_KEY), Value::from(entity_id)); + } else if let Ok(event_message_id) = row.try_get::(EVENT_MESSAGE_ID_COLUMN) { + value_mapping.insert(Name::new(INTERNAL_ENTITY_ID_KEY), Value::from(event_message_id)); } Ok(value_mapping) From c912aa52cc882a09c67dc274c1dbff4be431a7ad Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 12 Aug 2024 21:06:04 -0400 Subject: [PATCH 10/12] chore: add back executed at --- .../torii/graphql/src/object/event_message.rs | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index ee85a2e8f1..29734a0b7e 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -64,35 +64,31 @@ impl ResolvableObject for EventMessageObject { } fn subscriptions(&self) -> Option> { - Some(vec![ - SubscriptionField::new( - "eventMessageUpdated", - TypeRef::named_nn(self.type_name()), - |ctx| { - SubscriptionFieldFuture::new(async move { - let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), - None => None, - }; - // if id is None, then subscribe to all entities - // if id is Some, then subscribe to only the entity with that id - Ok(SimpleBroker::::subscribe().filter_map( - move |entity: EventMessage| { - if id.is_none() || id == Some(entity.id.clone()) { - Some(Ok(Value::Object(EventMessageObject::value_mapping( - entity, - )))) - } else { - // id != entity.id , then don't send anything, still listening - None - } - }, - )) - }) - }, - ) - .argument(InputValue::new("id", TypeRef::named(TypeRef::ID))), - ]) + Some(vec![SubscriptionField::new( + "eventMessageUpdated", + TypeRef::named_nn(self.type_name()), + |ctx| { + SubscriptionFieldFuture::new(async move { + let id = match ctx.args.get("id") { + Some(id) => Some(id.string()?.to_string()), + None => None, + }; + // if id is None, then subscribe to all entities + // if id is Some, then subscribe to only the entity with that id + Ok(SimpleBroker::::subscribe().filter_map( + move |entity: EventMessage| { + if id.is_none() || id == Some(entity.id.clone()) { + Some(Ok(Value::Object(EventMessageObject::value_mapping(entity)))) + } else { + // id != entity.id , then don't send anything, still listening + None + } + }, + )) + }) + }, + ) + .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))]) } } @@ -111,6 +107,10 @@ impl EventMessageObject { Name::new("updatedAt"), Value::from(entity.updated_at.format(DATETIME_FORMAT).to_string()), ), + ( + Name::new("executedAt"), + Value::from(entity.executed_at.format(DATETIME_FORMAT).to_string()), + ), ]) } } From da3a0a66f2f2ad36aadbfa5e814542f0eb0fceac Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 12 Aug 2024 21:08:28 -0400 Subject: [PATCH 11/12] fmt --- .../torii/graphql/src/object/event_message.rs | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index 29734a0b7e..0e2cec609e 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -64,31 +64,35 @@ impl ResolvableObject for EventMessageObject { } fn subscriptions(&self) -> Option> { - Some(vec![SubscriptionField::new( - "eventMessageUpdated", - TypeRef::named_nn(self.type_name()), - |ctx| { - SubscriptionFieldFuture::new(async move { - let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), - None => None, - }; - // if id is None, then subscribe to all entities - // if id is Some, then subscribe to only the entity with that id - Ok(SimpleBroker::::subscribe().filter_map( - move |entity: EventMessage| { - if id.is_none() || id == Some(entity.id.clone()) { - Some(Ok(Value::Object(EventMessageObject::value_mapping(entity)))) - } else { - // id != entity.id , then don't send anything, still listening - None - } - }, - )) - }) - }, - ) - .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))]) + Some(vec![ + SubscriptionField::new( + "eventMessageUpdated", + TypeRef::named_nn(self.type_name()), + |ctx| { + SubscriptionFieldFuture::new(async move { + let id = match ctx.args.get("id") { + Some(id) => Some(id.string()?.to_string()), + None => None, + }; + // if id is None, then subscribe to all entities + // if id is Some, then subscribe to only the entity with that id + Ok(SimpleBroker::::subscribe().filter_map( + move |entity: EventMessage| { + if id.is_none() || id == Some(entity.id.clone()) { + Some(Ok(Value::Object(EventMessageObject::value_mapping( + entity, + )))) + } else { + // id != entity.id , then don't send anything, still listening + None + } + }, + )) + }) + }, + ) + .argument(InputValue::new("id", TypeRef::named(TypeRef::ID))), + ]) } } From d77f45210ad9cab70a5cc89f4f75d6fa49af3b75 Mon Sep 17 00:00:00 2001 From: glihm Date: Mon, 12 Aug 2024 21:31:00 -0400 Subject: [PATCH 12/12] fix: use transaction waiter to ensure replicable tests even with slow machines --- crates/torii/core/src/sql_test.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index 63c64f19c2..b2010c6241 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -94,13 +94,13 @@ async fn test_load_from_remote() { let world = WorldContract::new(strat.world_address, &account); - world + let res = world .grant_writer(&compute_bytearray_hash("dojo_examples"), &ContractAddress(actions_address)) .send_with_cfg(&TxnConfig::init_wait()) .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); // spawn let tx = &account @@ -237,16 +237,16 @@ async fn test_load_from_remote_del() { let world = WorldContract::new(strat.world_address, &account); - world + let res = world .grant_writer(&compute_bytearray_hash("dojo_examples"), &ContractAddress(actions_address)) .send_with_cfg(&TxnConfig::init_wait()) .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); // spawn - account + let res = account .execute_v1(vec![Call { to: actions_address, selector: get_selector_from_name("spawn").unwrap(), @@ -256,10 +256,10 @@ async fn test_load_from_remote_del() { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); // Set player config. - account + let res = account .execute_v1(vec![Call { to: actions_address, selector: get_selector_from_name("set_player_config").unwrap(), @@ -270,9 +270,9 @@ async fn test_load_from_remote_del() { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); - account + let res = account .execute_v1(vec![Call { to: actions_address, selector: get_selector_from_name("reset_player_config").unwrap(), @@ -282,7 +282,7 @@ async fn test_load_from_remote_del() { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); let world_reader = WorldContractReader::new(strat.world_address, account.provider()); @@ -349,15 +349,16 @@ async fn test_get_entity_keys() { let world = WorldContract::new(strat.world_address, &account); - world + let res = world .grant_writer(&compute_bytearray_hash("dojo_examples"), &ContractAddress(actions_address)) .send_with_cfg(&TxnConfig::init_wait()) .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); // spawn - account + let res = account .execute_v1(vec![Call { to: actions_address, selector: get_selector_from_name("spawn").unwrap(), @@ -367,7 +368,7 @@ async fn test_get_entity_keys() { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); let world_reader = WorldContractReader::new(strat.world_address, account.provider());