diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 5ede6bb7e2..4e551b5e4f 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -477,12 +477,13 @@ impl<'c> Executor<'c> { sqlx::query( "INSERT INTO event_messages_historical (id, keys, event_id, data, \ - executed_at) VALUES (?, ?, ?, ?, ?) RETURNING *", + model_id, executed_at) VALUES (?, ?, ?, ?, ?, ?) RETURNING *", ) .bind(em_query.entity_id.clone()) .bind(em_query.keys_str.clone()) .bind(em_query.event_id.clone()) .bind(data) + .bind(em_query.model_id.clone()) .bind(em_query.block_timestamp.clone()) .fetch_one(&mut **tx) .await?; @@ -501,6 +502,7 @@ impl<'c> Executor<'c> { let mut event_message = EventMessageUpdated::from_row(&event_messages_row)?; event_message.updated_model = Some(em_query.ty); + event_message.historical = em_query.is_historical; let optimistic_event_message = OptimisticEventMessage { id: event_message.id.clone(), @@ -510,6 +512,7 @@ impl<'c> Executor<'c> { created_at: event_message.created_at, updated_at: event_message.updated_at, updated_model: event_message.updated_model.clone(), + historical: event_message.historical, }; SimpleBroker::publish(optimistic_event_message); diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs index 66b345dd1a..9b7d2dad33 100644 --- a/crates/torii/core/src/sql/utils.rs +++ b/crates/torii/core/src/sql/utils.rs @@ -1,5 +1,6 @@ use std::cmp::Ordering; use std::ops::{Add, AddAssign, Sub, SubAssign}; +use std::str::FromStr; use starknet::core::types::U256; use starknet_crypto::Felt; @@ -28,6 +29,10 @@ pub fn sql_string_to_u256(sql_string: &str) -> U256 { U256::from(crypto_bigint::U256::from_be_hex(sql_string)) } +pub fn sql_string_to_felts(sql_string: &str) -> Vec { + sql_string.split(FELT_DELIMITER).map(|felt| Felt::from_str(felt).unwrap()).collect() +} + // type used to do calculation on inmemory balances #[derive(Debug, Clone, Copy)] pub struct I256 { diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index 254a095517..a9ecf79a0d 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -79,6 +79,8 @@ pub struct EventMessage { // this should never be None #[sqlx(skip)] pub updated_model: Option, + #[sqlx(skip)] + pub historical: bool, } #[derive(FromRow, Deserialize, Debug, Clone)] @@ -94,6 +96,8 @@ pub struct OptimisticEventMessage { // this should never be None #[sqlx(skip)] pub updated_model: Option, + #[sqlx(skip)] + pub historical: bool, } #[derive(FromRow, Deserialize, Debug, Clone)] diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index fa3e6ef385..4898c44b8e 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -29,13 +29,13 @@ service World { rpc RetrieveEntitiesStreaming (RetrieveEntitiesRequest) returns (stream RetrieveEntitiesStreamingResponse); // Subscribe to entity updates. - rpc SubscribeEventMessages (SubscribeEntitiesRequest) returns (stream SubscribeEntityResponse); + rpc SubscribeEventMessages (SubscribeEventMessagesRequest) returns (stream SubscribeEntityResponse); // Update entity subscription - rpc UpdateEventMessagesSubscription (UpdateEntitiesSubscriptionRequest) returns (google.protobuf.Empty); + rpc UpdateEventMessagesSubscription (UpdateEventMessagesSubscriptionRequest) returns (google.protobuf.Empty); // Retrieve entities - rpc RetrieveEventMessages (RetrieveEntitiesRequest) returns (RetrieveEntitiesResponse); + rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse); // Retrieve events rpc RetrieveEvents (RetrieveEventsRequest) returns (RetrieveEventsResponse); @@ -81,11 +81,22 @@ message SubscribeEntitiesRequest { repeated types.EntityKeysClause clauses = 1; } +message SubscribeEventMessagesRequest { + repeated types.EntityKeysClause clauses = 1; + bool historical = 2; +} + message UpdateEntitiesSubscriptionRequest { uint64 subscription_id = 1; repeated types.EntityKeysClause clauses = 2; } +message UpdateEventMessagesSubscriptionRequest { + uint64 subscription_id = 1; + repeated types.EntityKeysClause clauses = 2; + bool historical = 3; +} + message SubscribeEntityResponse { types.Entity entity = 1; uint64 subscription_id = 2; @@ -96,6 +107,13 @@ message RetrieveEntitiesRequest { types.Query query = 1; } +message RetrieveEventMessagesRequest { + // The event messages to retrieve + types.Query query = 1; + // Should we retrieve historical event messages? + bool historical = 2; +} + message RetrieveEntitiesResponse { repeated types.Entity entities = 1; uint32 total_count = 2; diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index 0ca8c463e5..6499dde2dd 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -9,11 +9,12 @@ use tonic::codec::CompressionEncoding; use tonic::transport::Endpoint; use crate::proto::world::{ - world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, - RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, - SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, - SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, - UpdateEntitiesSubscriptionRequest, WorldMetadataRequest, + world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventMessagesRequest, + RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest, + SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsRequest, + SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, + SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, + UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, }; use crate::types::schema::{Entity, SchemaError}; use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query}; @@ -96,8 +97,9 @@ impl WorldClient { pub async fn retrieve_event_messages( &mut self, query: Query, + historical: bool, ) -> Result { - let request = RetrieveEntitiesRequest { query: Some(query.into()) }; + let request = RetrieveEventMessagesRequest { query: Some(query.into()), historical }; self.inner .retrieve_event_messages(request) .await @@ -172,11 +174,12 @@ impl WorldClient { pub async fn subscribe_event_messages( &mut self, clauses: Vec, + historical: bool, ) -> Result { let clauses = clauses.into_iter().map(|c| c.into()).collect(); let stream = self .inner - .subscribe_event_messages(SubscribeEntitiesRequest { clauses }) + .subscribe_event_messages(SubscribeEventMessagesRequest { clauses, historical }) .await .map_err(Error::Grpc) .map(|res| res.into_inner())?; @@ -194,12 +197,14 @@ impl WorldClient { &mut self, subscription_id: u64, clauses: Vec, + historical: bool, ) -> Result<(), Error> { let clauses = clauses.into_iter().map(|c| c.into()).collect(); self.inner - .update_event_messages_subscription(UpdateEntitiesSubscriptionRequest { + .update_event_messages_subscription(UpdateEventMessagesSubscriptionRequest { subscription_id, clauses, + historical, }) .await .map_err(Error::Grpc) diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index b1fb797836..4e710713e7 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -42,6 +42,7 @@ use tonic_web::GrpcWebLayer; use torii_core::error::{Error, ParseError, QueryError}; use torii_core::model::{build_sql_query, map_row_to_ty}; use torii_core::sql::cache::ModelCache; +use torii_core::sql::utils::sql_string_to_felts; use tower_http::cors::{AllowOrigin, CorsLayer}; use self::subscriptions::entity::EntityManager; @@ -52,8 +53,9 @@ use crate::proto::types::member_value::ValueType; use crate::proto::types::LogicalOperator; use crate::proto::world::world_server::WorldServer; use crate::proto::world::{ - RetrieveEntitiesStreamingResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, - SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, + RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, SubscribeEntitiesRequest, + SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, + SubscribeIndexerRequest, SubscribeIndexerResponse, UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse, }; use crate::proto::{self}; @@ -68,6 +70,8 @@ pub(crate) static EVENT_MESSAGES_TABLE: &str = "event_messages"; pub(crate) static EVENT_MESSAGES_MODEL_RELATION_TABLE: &str = "event_model"; pub(crate) static EVENT_MESSAGES_ENTITY_RELATION_COLUMN: &str = "event_message_id"; +pub(crate) static EVENT_MESSAGES_HISTORICAL_TABLE: &str = "event_messages_historical"; + impl From for Error { fn from(err: SchemaError) -> Self { match err { @@ -312,6 +316,44 @@ impl DojoWorld { Ok(all_entities) } + async fn fetch_historical_event_messages( + &self, + query: &str, + keys_pattern: Option<&str>, + limit: Option, + offset: Option, + ) -> Result, Error> { + let db_entities: Vec<(String, String, String, String)> = if keys_pattern.is_some() { + sqlx::query_as(query) + .bind(keys_pattern.unwrap()) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await? + } else { + sqlx::query_as(query).bind(limit).bind(offset).fetch_all(&self.pool).await? + }; + + let mut entities = HashMap::new(); + for (id, data, model_id, _) in db_entities { + let hashed_keys = + Felt::from_str(&id).map_err(ParseError::FromStr)?.to_bytes_be().to_vec(); + let model = self + .model_cache + .model(&Felt::from_str(&model_id).map_err(ParseError::FromStr)?) + .await?; + let mut schema = model.schema; + schema.deserialize(&mut sql_string_to_felts(&data))?; + + let entity = entities + .entry(id) + .or_insert_with(|| proto::types::Entity { hashed_keys, models: vec![] }); + entity.models.push(schema.as_struct().unwrap().clone().into()); + } + + Ok(entities.into_values().collect()) + } + #[allow(clippy::too_many_arguments)] pub(crate) async fn query_by_hashed_keys( &self, @@ -353,8 +395,20 @@ impl DojoWorld { } // Query to get entity IDs and their model IDs - let mut query = format!( - r#" + let mut query = if table == EVENT_MESSAGES_HISTORICAL_TABLE { + format!( + r#" + SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids + FROM {table} + JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id + {filter_ids} + GROUP BY {table}.event_id + ORDER BY {table}.event_id DESC + "# + ) + } else { + format!( + r#" SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_ids FROM {table} JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id @@ -362,7 +416,8 @@ impl DojoWorld { GROUP BY {table}.id ORDER BY {table}.event_id DESC "# - ); + ) + }; if limit.is_some() { query += " LIMIT ?" @@ -372,6 +427,12 @@ impl DojoWorld { query += " OFFSET ?" } + if table == EVENT_MESSAGES_HISTORICAL_TABLE { + let entities = + self.fetch_historical_event_messages(&query, None, limit, offset).await?; + return Ok((entities, total_count)); + } + let db_entities: Vec<(String, String)> = sqlx::query_as(&query).bind(limit).bind(offset).fetch_all(&self.pool).await?; @@ -446,15 +507,27 @@ impl DojoWorld { return Ok((Vec::new(), 0)); } - let mut models_query = format!( - r#" - SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_ids - FROM {table} - JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id - WHERE {table}.keys REGEXP ? - GROUP BY {table}.id - "# - ); + let mut models_query = if table == EVENT_MESSAGES_HISTORICAL_TABLE { + format!( + r#" + SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids + FROM {table} + JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id + WHERE {table}.keys REGEXP ? + GROUP BY {table}.event_id + "# + ) + } else { + format!( + r#" + SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_ids + FROM {table} + JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id + WHERE {table}.keys REGEXP ? + GROUP BY {table}.id + "# + ) + }; if !keys_clause.models.is_empty() { // filter by models @@ -485,6 +558,13 @@ impl DojoWorld { models_query += " OFFSET ?"; } + if table == EVENT_MESSAGES_HISTORICAL_TABLE { + let entities = self + .fetch_historical_event_messages(&models_query, Some(&keys_pattern), limit, offset) + .await?; + return Ok((entities, total_count)); + } + let db_entities: Vec<(String, String)> = sqlx::query_as(&models_query) .bind(&keys_pattern) .bind(limit) @@ -838,9 +918,10 @@ impl DojoWorld { async fn subscribe_event_messages( &self, clauses: Vec, + historical: bool, ) -> Result>, Error> { self.event_message_manager - .add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect()) + .add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect(), historical) .await } @@ -1195,11 +1276,11 @@ impl proto::world::world_server::World for DojoWorld { async fn subscribe_event_messages( &self, - request: Request, + request: Request, ) -> ServiceResult { - let SubscribeEntitiesRequest { clauses } = request.into_inner(); + let SubscribeEventMessagesRequest { clauses, historical } = request.into_inner(); let rx = self - .subscribe_event_messages(clauses) + .subscribe_event_messages(clauses, historical) .await .map_err(|e| Status::internal(e.to_string()))?; @@ -1208,13 +1289,15 @@ impl proto::world::world_server::World for DojoWorld { async fn update_event_messages_subscription( &self, - request: Request, + request: Request, ) -> ServiceResult<()> { - let UpdateEntitiesSubscriptionRequest { subscription_id, clauses } = request.into_inner(); + let UpdateEventMessagesSubscriptionRequest { subscription_id, clauses, historical } = + request.into_inner(); self.event_message_manager .update_subscriber( subscription_id, clauses.into_iter().map(|keys| keys.into()).collect(), + historical, ) .await; @@ -1223,16 +1306,14 @@ impl proto::world::world_server::World for DojoWorld { async fn retrieve_event_messages( &self, - request: Request, + request: Request, ) -> Result, Status> { - let query = request - .into_inner() - .query - .ok_or_else(|| Status::invalid_argument("Missing query argument"))?; + let RetrieveEventMessagesRequest { query, historical } = request.into_inner(); + let query = query.ok_or_else(|| Status::invalid_argument("Missing query argument"))?; let entities = self .retrieve_entities( - EVENT_MESSAGES_TABLE, + if historical { EVENT_MESSAGES_HISTORICAL_TABLE } else { EVENT_MESSAGES_TABLE }, EVENT_MESSAGES_MODEL_RELATION_TABLE, EVENT_MESSAGES_ENTITY_RELATION_COLUMN, query, diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index c0aa33edfe..1fb578ed47 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -9,7 +9,9 @@ use futures::Stream; use futures_util::StreamExt; use rand::Rng; use starknet::core::types::Felt; -use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{ + channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, +}; use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; @@ -17,7 +19,6 @@ use torii_core::sql::FELT_DELIMITER; use torii_core::types::OptimisticEventMessage; use tracing::{error, trace}; -use super::entity::EntitiesSubscriber; use super::match_entity_keys; use crate::proto; use crate::proto::world::SubscribeEntityResponse; @@ -25,15 +26,26 @@ use crate::types::EntityKeysClause; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message"; +#[derive(Debug)] +pub struct EventMessageSubscriber { + /// Entity ids that the subscriber is interested in + pub(crate) clauses: Vec, + /// Whether the subscriber is interested in historical event messages + pub(crate) historical: bool, + /// The channel to send the response back to the subscriber. + pub(crate) sender: Sender>, +} + #[derive(Debug, Default)] pub struct EventMessageManager { - subscribers: RwLock>, + subscribers: RwLock>, } impl EventMessageManager { pub async fn add_subscriber( &self, clauses: Vec, + historical: bool, ) -> Result>, Error> { let subscription_id = rand::thread_rng().gen::(); let (sender, receiver) = channel(1); @@ -46,12 +58,17 @@ impl EventMessageManager { self.subscribers .write() .await - .insert(subscription_id, EntitiesSubscriber { clauses, sender }); + .insert(subscription_id, EventMessageSubscriber { clauses, historical, sender }); Ok(receiver) } - pub async fn update_subscriber(&self, id: u64, clauses: Vec) { + pub async fn update_subscriber( + &self, + id: u64, + clauses: Vec, + historical: bool, + ) { let sender = { let subscribers = self.subscribers.read().await; if let Some(subscriber) = subscribers.get(&id) { @@ -61,7 +78,10 @@ impl EventMessageManager { } }; - self.subscribers.write().await.insert(id, EntitiesSubscriber { clauses, sender }); + self.subscribers + .write() + .await + .insert(id, EventMessageSubscriber { clauses, historical, sender }); } pub(super) async fn remove_subscriber(&self, id: u64) { @@ -115,6 +135,11 @@ impl Service { .map_err(ParseError::FromStr)?; for (idx, sub) in subs.subscribers.read().await.iter() { + // Check if the subscriber is interested in this historical or non-historical event + if sub.historical != entity.historical { + continue; + } + // Check if the subscriber is interested in this entity // If we have a clause of hashed keys, then check that the id of the entity // is in the list of hashed keys. diff --git a/crates/torii/migrations/20241028234131_event_message_historical.sql b/crates/torii/migrations/20241028234131_event_message_historical.sql index 0a5f0fe26d..c0ce4a9e97 100644 --- a/crates/torii/migrations/20241028234131_event_message_historical.sql +++ b/crates/torii/migrations/20241028234131_event_message_historical.sql @@ -8,6 +8,8 @@ CREATE TABLE event_messages_historical ( event_id TEXT NOT NULL, -- The serialized data of the event, which contains the Ty. data TEXT NOT NULL, + -- The model id of the serialized data. + model_id TEXT NOT NULL, executed_at DATETIME NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP