-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(torii-graphql): event messages update & subscription #2227
Changes from all commits
c4b027c
c8ff43d
ac16715
9648106
5096673
e903e3d
b7d54fa
96c39df
5884f8b
c912aa5
da3a0a6
d77f452
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,23 +3,22 @@ use async_graphql::dynamic::{ | |||||
Field, FieldFuture, FieldValue, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef, | ||||||
}; | ||||||
use async_graphql::{Name, Value}; | ||||||
use async_recursion::async_recursion; | ||||||
use sqlx::pool::PoolConnection; | ||||||
use sqlx::{Pool, Sqlite}; | ||||||
use tokio_stream::StreamExt; | ||||||
use torii_core::simple_broker::SimpleBroker; | ||||||
use torii_core::types::EventMessage; | ||||||
|
||||||
use super::entity::model_data_recursive_query; | ||||||
use super::inputs::keys_input::keys_argument; | ||||||
use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; | ||||||
use crate::constants::{ | ||||||
EVENT_ID_COLUMN, EVENT_MESSAGE_NAMES, EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, | ||||||
DATETIME_FORMAT, EVENT_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, EVENT_MESSAGE_NAMES, | ||||||
EVENT_MESSAGE_TABLE, EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, | ||||||
}; | ||||||
use crate::mapping::ENTITY_TYPE_MAPPING; | ||||||
use crate::object::{resolve_many, resolve_one}; | ||||||
use crate::query::{type_mapping_query, value_mapping_from_row}; | ||||||
use crate::types::TypeData; | ||||||
use crate::utils::extract; | ||||||
use crate::query::type_mapping_query; | ||||||
use crate::utils; | ||||||
|
||||||
#[derive(Debug)] | ||||||
pub struct EventMessageObject; | ||||||
|
@@ -106,11 +105,15 @@ impl EventMessageObject { | |||||
(Name::new("eventId"), Value::from(entity.event_id)), | ||||||
( | ||||||
Name::new("createdAt"), | ||||||
Value::from(entity.created_at.format("%Y-%m-%d %H:%M:%S").to_string()), | ||||||
Value::from(entity.created_at.format(DATETIME_FORMAT).to_string()), | ||||||
), | ||||||
( | ||||||
Name::new("updatedAt"), | ||||||
Value::from(entity.updated_at.format("%Y-%m-%d %H:%M:%S").to_string()), | ||||||
Value::from(entity.updated_at.format(DATETIME_FORMAT).to_string()), | ||||||
), | ||||||
( | ||||||
Name::new("executedAt"), | ||||||
Value::from(entity.executed_at.format(DATETIME_FORMAT).to_string()), | ||||||
), | ||||||
]) | ||||||
} | ||||||
|
@@ -123,13 +126,13 @@ fn model_union_field() -> Field { | |||||
Value::Object(indexmap) => { | ||||||
let mut conn = ctx.data::<Pool<Sqlite>>()?.acquire().await?; | ||||||
|
||||||
let entity_id = extract::<String>(indexmap, "id")?; | ||||||
let entity_id = utils::extract::<String>(indexmap, "id")?; | ||||||
// fetch name from the models table | ||||||
// using the model id (hashed model name) | ||||||
let model_ids: Vec<(String, String)> = sqlx::query_as( | ||||||
"SELECT id, name | ||||||
let model_ids: Vec<(String, String, String)> = sqlx::query_as( | ||||||
"SELECT id, namespace, name | ||||||
FROM models | ||||||
WHERE id IN ( | ||||||
WHERE id IN ( | ||||||
SELECT model_id | ||||||
FROM event_model | ||||||
WHERE entity_id = ? | ||||||
|
@@ -140,20 +143,30 @@ fn model_union_field() -> Field { | |||||
.await?; | ||||||
|
||||||
let mut results: Vec<FieldValue<'_>> = Vec::new(); | ||||||
for (id, name) in model_ids { | ||||||
// the model id is used as the id for the model members | ||||||
for (id, namespace, name) in model_ids { | ||||||
// the model id in the model mmeebrs table is the hashed model name (id) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
let type_mapping = type_mapping_query(&mut conn, &id).await?; | ||||||
|
||||||
// but the model data tables use the unhashed model name as the table name | ||||||
let data = model_data_recursive_query( | ||||||
// but the table name for the model data is the unhashed model name | ||||||
let data: ValueMapping = match model_data_recursive_query( | ||||||
&mut conn, | ||||||
vec![name.clone()], | ||||||
EVENT_MESSAGE_ID_COLUMN, | ||||||
vec![format!("{namespace}-{name}")], | ||||||
&entity_id, | ||||||
&[], | ||||||
&type_mapping, | ||||||
false, | ||||||
) | ||||||
.await?; | ||||||
.await? | ||||||
{ | ||||||
Value::Object(map) => map, | ||||||
_ => unreachable!(), | ||||||
}; | ||||||
|
||||||
results.push(FieldValue::with_type(FieldValue::owned_any(data), name)); | ||||||
results.push(FieldValue::with_type( | ||||||
FieldValue::owned_any(data), | ||||||
utils::type_name_from_names(&namespace, &name), | ||||||
)) | ||||||
} | ||||||
|
||||||
Ok(Some(FieldValue::list(results))) | ||||||
|
@@ -163,33 +176,3 @@ fn model_union_field() -> Field { | |||||
}) | ||||||
}) | ||||||
} | ||||||
|
||||||
// TODO: flatten query | ||||||
#[async_recursion] | ||||||
pub async fn model_data_recursive_query( | ||||||
conn: &mut PoolConnection<Sqlite>, | ||||||
path_array: Vec<String>, | ||||||
entity_id: &str, | ||||||
type_mapping: &TypeMapping, | ||||||
) -> sqlx::Result<ValueMapping> { | ||||||
// For nested types, we need to remove prefix in path array | ||||||
let namespace = format!("{}_", path_array[0]); | ||||||
let table_name = &path_array.join("$").replace(&namespace, ""); | ||||||
let query = format!("SELECT * FROM {} WHERE event_message_id = '{}'", table_name, entity_id); | ||||||
let row = sqlx::query(&query).fetch_one(conn.as_mut()).await?; | ||||||
let mut value_mapping = value_mapping_from_row(&row, type_mapping, true)?; | ||||||
|
||||||
for (field_name, type_data) in type_mapping { | ||||||
if let TypeData::Nested((_, nested_mapping)) = type_data { | ||||||
let mut nested_path = path_array.clone(); | ||||||
nested_path.push(field_name.to_string()); | ||||||
|
||||||
let nested_values = | ||||||
model_data_recursive_query(conn, nested_path, entity_id, nested_mapping).await?; | ||||||
|
||||||
value_mapping.insert(Name::new(field_name), Value::Object(nested_values)); | ||||||
} | ||||||
} | ||||||
|
||||||
Ok(value_mapping) | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -10,7 +10,8 @@ use super::inputs::where_input::{parse_where_argument, where_argument, WhereInpu | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use super::inputs::InputObjectTrait; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use crate::constants::{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ENTITY_ID_COLUMN, ENTITY_TABLE, EVENT_ID_COLUMN, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ENTITY_ID_COLUMN, ENTITY_TABLE, ENTITY_TYPE_NAME, EVENT_ID_COLUMN, EVENT_MESSAGE_TABLE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EVENT_MESSAGE_TYPE_NAME, ID_COLUMN, INTERNAL_ENTITY_ID_KEY, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use crate::mapping::ENTITY_TYPE_MAPPING; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use crate::query::data::{count_rows, fetch_multiple_rows, fetch_single_row}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -77,6 +78,7 @@ impl BasicObject for ModelDataObject { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// root object requires entity_field association | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut root = objects.pop().unwrap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
root = root.field(entity_field()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
root = root.field(event_message_field()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
objects.push(root); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
objects | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -245,7 +247,7 @@ pub fn object(type_name: &str, type_mapping: &TypeMapping, path_array: Vec<Strin | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn entity_field() -> Field { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Field::new("entity", TypeRef::named("World__Entity"), |ctx| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Field::new("entity", TypeRef::named(ENTITY_TYPE_NAME), |ctx| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FieldFuture::new(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
match ctx.parent_value.try_to_value()? { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Value::Object(indexmap) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -262,3 +264,23 @@ fn entity_field() -> Field { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn event_message_field() -> Field { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Field::new("eventMessage", TypeRef::named(EVENT_MESSAGE_TYPE_NAME), |ctx| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FieldFuture::new(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
match ctx.parent_value.try_to_value()? { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Value::Object(indexmap) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut conn = ctx.data::<Pool<Sqlite>>()?.acquire().await?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let entity_id = utils::extract::<String>(indexmap, INTERNAL_ENTITY_ID_KEY)?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let data = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fetch_single_row(&mut conn, EVENT_MESSAGE_TABLE, ID_COLUMN, &entity_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let event_message = value_mapping_from_row(&data, &ENTITY_TYPE_MAPPING, false)?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(Some(Value::Object(event_message))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_ => Err("incorrect value, requires Value::Object".into()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+268
to
+286
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo, sensei! The new The function is well-structured and follows best practices for asynchronous database access and error handling. Consider adding more detailed error messages to improve debugging. - _ => Err("incorrect value, requires Value::Object".into()),
+ _ => Err("Incorrect value type encountered. Expected Value::Object.".into()), Ensure that this function is covered by unit tests. Do you want me to generate the unit testing code or open a GitHub issue to track this task? Committable suggestion
Suggested change
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Consider removing unused imports.
The
model_data_recursive_query
function is removed, but its import still exists. Removing unused imports helps maintain clean and efficient code.- use super::entity::model_data_recursive_query;
Committable suggestion