Skip to content

Commit

Permalink
refactor: share logic between event messages and entities
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Aug 5, 2024
1 parent 63d29df commit 66fbbdf
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 158 deletions.
1 change: 1 addition & 0 deletions crates/torii/graphql/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub const METADATA_TABLE: &str = "metadata";
pub const ID_COLUMN: &str = "id";
pub const EVENT_ID_COLUMN: &str = "event_id";
pub const ENTITY_ID_COLUMN: &str = "entity_id";
pub const EVENT_MESSAGE_ID_COLUMN: &str = "event_message_id";
pub const JSON_COLUMN: &str = "json";
pub const TRANSACTION_HASH_COLUMN: &str = "transaction_hash";

Expand Down
11 changes: 8 additions & 3 deletions crates/torii/graphql/src/object/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use torii_core::types::Entity;
use super::inputs::keys_input::keys_argument;
use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping};
use crate::constants::{
DATETIME_FORMAT, ENTITY_NAMES, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, ID_COLUMN,
DATETIME_FORMAT, ENTITY_ID_COLUMN, ENTITY_NAMES, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, ID_COLUMN
};
use crate::mapping::ENTITY_TYPE_MAPPING;
use crate::object::{resolve_many, resolve_one};
Expand Down Expand Up @@ -143,6 +143,7 @@ fn model_union_field() -> Field {
// but the table name for the model data is the unhashed model name
let data: ValueMapping = match model_data_recursive_query(
&mut conn,
ENTITY_ID_COLUMN,
vec![format!("{namespace}-{name}")],
&entity_id,
&[],
Expand Down Expand Up @@ -173,6 +174,7 @@ fn model_union_field() -> Field {
#[async_recursion]
pub async fn model_data_recursive_query(
conn: &mut PoolConnection<Sqlite>,
entity_id_column: &str,
path_array: Vec<String>,
entity_id: &str,
indexes: &[i64],
Expand All @@ -182,7 +184,8 @@ pub async fn model_data_recursive_query(
// For nested types, we need to remove prefix in path array
let namespace = format!("{}_", path_array[0]);
let table_name = &path_array.join("$").replace(&namespace, "");
let mut query = format!("SELECT * FROM [{}] WHERE entity_id = '{}' ", table_name, entity_id);
let mut query =
format!("SELECT * FROM [{}] WHERE {entity_id_column} = '{}' ", table_name, entity_id);
for (column_idx, index) in indexes.iter().enumerate() {
query.push_str(&format!("AND idx_{} = {} ", column_idx, index));
}
Expand All @@ -196,7 +199,7 @@ pub async fn model_data_recursive_query(
let mut nested_value_mappings = Vec::new();

for (idx, row) in rows.iter().enumerate() {
let mut nested_value_mapping = value_mapping_from_row(row, type_mapping, true)?;
let mut nested_value_mapping = value_mapping_from_row(row, entity_id_column, type_mapping, true)?;

for (field_name, type_data) in type_mapping {
if let TypeData::Nested((_, nested_mapping)) = type_data {
Expand All @@ -205,6 +208,7 @@ pub async fn model_data_recursive_query(

let nested_values = model_data_recursive_query(
conn,
entity_id_column,
nested_path,
entity_id,
&if is_list {
Expand All @@ -226,6 +230,7 @@ pub async fn model_data_recursive_query(

let data = match model_data_recursive_query(
conn,
entity_id_column,
nested_path,
entity_id,
// this might need to be changed to support 2d+ arrays
Expand Down
176 changes: 29 additions & 147 deletions crates/torii/graphql/src/object/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,18 @@ use async_graphql::dynamic::{
Field, FieldFuture, FieldValue, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef,
};
use async_graphql::{Name, Value};
use async_recursion::async_recursion;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use tokio_stream::StreamExt;
use torii_core::simple_broker::SimpleBroker;
use torii_core::types::EventMessage;

use super::entity::model_data_recursive_query;
use super::inputs::keys_input::keys_argument;
use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping};
use crate::constants::{
DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE,
EVENT_MESSAGE_TYPE_NAME, ID_COLUMN,
DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN
};
use crate::mapping::ENTITY_TYPE_MAPPING;
use crate::object::{resolve_many, resolve_one};
use crate::query::{type_mapping_query, value_mapping_from_row};
use crate::types::TypeData;
use crate::query::type_mapping_query;
use crate::utils;

#[derive(Debug)]
Expand Down Expand Up @@ -66,35 +61,31 @@ impl ResolvableObject for EventMessageObject {
}

fn subscriptions(&self) -> Option<Vec<SubscriptionField>> {
Some(vec![
SubscriptionField::new(
"eventMessageUpdated",
TypeRef::named_nn(self.type_name()),
|ctx| {
SubscriptionFieldFuture::new(async move {
let id = match ctx.args.get("id") {
Some(id) => Some(id.string()?.to_string()),
None => None,
};
// if id is None, then subscribe to all entities
// if id is Some, then subscribe to only the entity with that id
Ok(SimpleBroker::<EventMessage>::subscribe().filter_map(
move |entity: EventMessage| {
if id.is_none() || id == Some(entity.id.clone()) {
Some(Ok(Value::Object(EventMessageObject::value_mapping(
entity,
))))
} else {
// id != entity.id , then don't send anything, still listening
None
}
},
))
})
},
)
.argument(InputValue::new("id", TypeRef::named(TypeRef::ID))),
])
Some(vec![SubscriptionField::new(
"eventMessageUpdated",
TypeRef::named_nn(self.type_name()),
|ctx| {
SubscriptionFieldFuture::new(async move {
let id = match ctx.args.get("id") {
Some(id) => Some(id.string()?.to_string()),
None => None,
};
// if id is None, then subscribe to all entities
// if id is Some, then subscribe to only the entity with that id
Ok(SimpleBroker::<EventMessage>::subscribe().filter_map(
move |entity: EventMessage| {
if id.is_none() || id == Some(entity.id.clone()) {
Some(Ok(Value::Object(EventMessageObject::value_mapping(entity))))
} else {
// id != entity.id , then don't send anything, still listening
None
}
},
))
})
},
)
.argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))])
}
}

Expand Down Expand Up @@ -152,6 +143,7 @@ fn model_union_field() -> Field {
// but the table name for the model data is the unhashed model name
let data: ValueMapping = match model_data_recursive_query(
&mut conn,
EVENT_MESSAGE_ID_COLUMN,
vec![format!("{namespace}-{name}")],
&entity_id,
&[],
Expand All @@ -177,113 +169,3 @@ fn model_union_field() -> Field {
})
})
}

// TODO: flatten query
#[async_recursion]
pub async fn model_data_recursive_query(
conn: &mut PoolConnection<Sqlite>,
path_array: Vec<String>,
entity_id: &str,
indexes: &[i64],
type_mapping: &TypeMapping,
is_list: bool,
) -> sqlx::Result<Value> {
// For nested types, we need to remove prefix in path array
let namespace = format!("{}_", path_array[0]);
let table_name = &path_array.join("$").replace(&namespace, "");
let mut query =
format!("SELECT * FROM [{}] WHERE event_message_id = '{}' ", table_name, entity_id);
for (column_idx, index) in indexes.iter().enumerate() {
query.push_str(&format!("AND idx_{} = {} ", column_idx, index));
}

let rows = sqlx::query(&query).fetch_all(conn.as_mut()).await?;
if rows.is_empty() {
return Ok(Value::List(vec![]));
}

let value_mapping: Value;
let mut nested_value_mappings = Vec::new();

for (idx, row) in rows.iter().enumerate() {
let mut nested_value_mapping = value_mapping_from_row(row, type_mapping, true)?;

for (field_name, type_data) in type_mapping {
if let TypeData::Nested((_, nested_mapping)) = type_data {
let mut nested_path = path_array.clone();
nested_path.push(field_name.to_string());

let nested_values = model_data_recursive_query(
conn,
nested_path,
entity_id,
&if is_list {
let mut indexes = indexes.to_vec();
indexes.push(idx as i64);
indexes
} else {
indexes.to_vec()
},
nested_mapping,
false,
)
.await?;

nested_value_mapping.insert(Name::new(field_name), nested_values);
} else if let TypeData::List(inner) = type_data {
let mut nested_path = path_array.clone();
nested_path.push(field_name.to_string());

let data = match model_data_recursive_query(
conn,
nested_path,
entity_id,
// this might need to be changed to support 2d+ arrays
&if is_list {
let mut indexes = indexes.to_vec();
indexes.push(idx as i64);
indexes
} else {
indexes.to_vec()
},
&IndexMap::from([(Name::new("data"), *inner.clone())]),
true,
)
.await?
{
// map our list which uses a data field as a place holder
// for all elements to get the elemnt directly
Value::List(data) => data
.iter()
.map(|v| match v {
Value::Object(map) => map.get(&Name::new("data")).unwrap().clone(),
ty => unreachable!(
"Expected Value::Object for list \"data\" field, got {:?}",
ty
),
})
.collect(),
Value::Object(map) => map.get(&Name::new("data")).unwrap().clone(),
ty => {
unreachable!(
"Expected Value::List or Value::Object for list, got {:?}",
ty
);
}
};

nested_value_mapping.insert(Name::new(field_name), data);
}
}

nested_value_mappings.push(Value::Object(nested_value_mapping));
}

if is_list {
value_mapping = Value::List(nested_value_mappings);
} else {
value_mapping = nested_value_mappings.pop().unwrap();
}

Ok(value_mapping)
}
3 changes: 2 additions & 1 deletion crates/torii/graphql/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use self::connection::{
};
use self::inputs::keys_input::parse_keys_argument;
use self::inputs::order_input::parse_order_argument;
use crate::constants::ENTITY_ID_COLUMN;
use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row};
use crate::query::value_mapping_from_row;
use crate::types::{TypeMapping, ValueMapping};
Expand Down Expand Up @@ -135,7 +136,7 @@ pub fn resolve_one(
let id: String =
extract::<String>(ctx.args.as_index_map(), &id_column.to_case(Case::Camel))?;
let data = fetch_single_row(&mut conn, &table_name, &id_column, &id).await?;
let model = value_mapping_from_row(&data, &type_mapping, false)?;
let model = value_mapping_from_row(&data, ENTITY_ID_COLUMN, &type_mapping, false)?;
Ok(Some(Value::Object(model)))
})
})
Expand Down
20 changes: 15 additions & 5 deletions crates/torii/graphql/src/object/model_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use super::inputs::where_input::{parse_where_argument, where_argument, WhereInpu
use super::inputs::InputObjectTrait;
use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping};
use crate::constants::{
ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_TABLE,
EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY,
ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN,
EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY,
};
use crate::mapping::ENTITY_TYPE_MAPPING;
use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row};
Expand Down Expand Up @@ -213,7 +213,7 @@ pub fn object(type_name: &str, type_mapping: &TypeMapping, path_array: Vec<Strin
&entity_id,
)
.await?;
let result = value_mapping_from_row(&data, &nested_mapping, true)?;
let result = value_mapping_from_row(&data, ENTITY_ID_COLUMN, &nested_mapping, true)?;

Ok(Some(Value::Object(result)))
}
Expand Down Expand Up @@ -255,7 +255,12 @@ fn entity_field() -> Field {
let entity_id = utils::extract::<String>(indexmap, INTERNAL_ENTITY_ID_KEY)?;
let data =
fetch_single_row(&mut conn, ENTITY_TABLE, ID_COLUMN, &entity_id).await?;
let entity = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?;
let entity = value_mapping_from_row(
&data,
ENTITY_ID_COLUMN,
&ENTITY_TYPE_MAPPING,
false,
)?;

Ok(Some(Value::Object(entity)))
}
Expand All @@ -275,7 +280,12 @@ fn event_message_field() -> Field {
let data =
fetch_single_row(&mut conn, EVENT_MESSAGE_TABLE, ID_COLUMN, &entity_id)
.await?;
let event_message = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?;
let event_message = value_mapping_from_row(
&data,
EVENT_MESSAGE_ID_COLUMN,
&ENTITY_TYPE_MAPPING,
false,
)?;

Ok(Some(Value::Object(event_message)))
}
Expand Down
5 changes: 3 additions & 2 deletions crates/torii/graphql/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sqlx::sqlite::SqliteRow;
use sqlx::{Row, SqliteConnection};
use torii_core::sql::FELT_DELIMITER;

use crate::constants::{BOOLEAN_TRUE, ENTITY_ID_COLUMN, INTERNAL_ENTITY_ID_KEY};
use crate::constants::{BOOLEAN_TRUE, INTERNAL_ENTITY_ID_KEY};
use crate::object::model_data::ModelMember;
use crate::types::{TypeData, TypeMapping, ValueMapping};

Expand Down Expand Up @@ -167,6 +167,7 @@ fn remove_hex_leading_zeros(value: Value) -> Value {

pub fn value_mapping_from_row(
row: &SqliteRow,
entity_id_column: &str,
types: &TypeMapping,
is_external: bool,
) -> sqlx::Result<ValueMapping> {
Expand All @@ -193,7 +194,7 @@ pub fn value_mapping_from_row(
.collect::<sqlx::Result<ValueMapping>>()?;

// entity_id is not part of a model's type_mapping but needed to relate to parent entity
if let Ok(entity_id) = row.try_get::<String, &str>(ENTITY_ID_COLUMN) {
if let Ok(entity_id) = row.try_get::<String, &str>(entity_id_column) {
value_mapping.insert(Name::new(INTERNAL_ENTITY_ID_KEY), Value::from(entity_id));
}

Expand Down

0 comments on commit 66fbbdf

Please sign in to comment.