Skip to content

Commit

Permalink
Merge remote-tracking branch 'dojo/feat/dojo-1-rc0' into feat/dojo-1-rc0
Browse files Browse the repository at this point in the history
  • Loading branch information
glihm committed Oct 30, 2024
2 parents 0fab9fe + 059b69e commit 5d7f632
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 44 deletions.
5 changes: 4 additions & 1 deletion crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,13 @@ impl<'c> Executor<'c> {

sqlx::query(
"INSERT INTO event_messages_historical (id, keys, event_id, data, \
executed_at) VALUES (?, ?, ?, ?, ?) RETURNING *",
model_id, executed_at) VALUES (?, ?, ?, ?, ?, ?) RETURNING *",
)
.bind(em_query.entity_id.clone())
.bind(em_query.keys_str.clone())
.bind(em_query.event_id.clone())
.bind(data)
.bind(em_query.model_id.clone())
.bind(em_query.block_timestamp.clone())
.fetch_one(&mut **tx)
.await?;
Expand All @@ -501,6 +502,7 @@ impl<'c> Executor<'c> {

let mut event_message = EventMessageUpdated::from_row(&event_messages_row)?;
event_message.updated_model = Some(em_query.ty);
event_message.historical = em_query.is_historical;

let optimistic_event_message = OptimisticEventMessage {
id: event_message.id.clone(),
Expand All @@ -510,6 +512,7 @@ impl<'c> Executor<'c> {
created_at: event_message.created_at,
updated_at: event_message.updated_at,
updated_model: event_message.updated_model.clone(),
historical: event_message.historical,
};
SimpleBroker::publish(optimistic_event_message);

Expand Down
5 changes: 5 additions & 0 deletions crates/torii/core/src/sql/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cmp::Ordering;
use std::ops::{Add, AddAssign, Sub, SubAssign};
use std::str::FromStr;

use starknet::core::types::U256;
use starknet_crypto::Felt;
Expand Down Expand Up @@ -28,6 +29,10 @@ pub fn sql_string_to_u256(sql_string: &str) -> U256 {
U256::from(crypto_bigint::U256::from_be_hex(sql_string))
}

pub fn sql_string_to_felts(sql_string: &str) -> Vec<Felt> {
sql_string.split(FELT_DELIMITER).map(|felt| Felt::from_str(felt).unwrap()).collect()
}

// type used to do calculation on inmemory balances
#[derive(Debug, Clone, Copy)]
pub struct I256 {
Expand Down
4 changes: 4 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct EventMessage {
// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
#[sqlx(skip)]
pub historical: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
Expand All @@ -94,6 +96,8 @@ pub struct OptimisticEventMessage {
// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
#[sqlx(skip)]
pub historical: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
Expand Down
24 changes: 21 additions & 3 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ service World {
rpc RetrieveEntitiesStreaming (RetrieveEntitiesRequest) returns (stream RetrieveEntitiesStreamingResponse);

// Subscribe to entity updates.
rpc SubscribeEventMessages (SubscribeEntitiesRequest) returns (stream SubscribeEntityResponse);
rpc SubscribeEventMessages (SubscribeEventMessagesRequest) returns (stream SubscribeEntityResponse);

// Update entity subscription
rpc UpdateEventMessagesSubscription (UpdateEntitiesSubscriptionRequest) returns (google.protobuf.Empty);
rpc UpdateEventMessagesSubscription (UpdateEventMessagesSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEntitiesRequest) returns (RetrieveEntitiesResponse);
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

// Retrieve events
rpc RetrieveEvents (RetrieveEventsRequest) returns (RetrieveEventsResponse);
Expand Down Expand Up @@ -81,11 +81,22 @@ message SubscribeEntitiesRequest {
repeated types.EntityKeysClause clauses = 1;
}

message SubscribeEventMessagesRequest {
repeated types.EntityKeysClause clauses = 1;
bool historical = 2;
}

message UpdateEntitiesSubscriptionRequest {
uint64 subscription_id = 1;
repeated types.EntityKeysClause clauses = 2;
}

message UpdateEventMessagesSubscriptionRequest {
uint64 subscription_id = 1;
repeated types.EntityKeysClause clauses = 2;
bool historical = 3;
}

message SubscribeEntityResponse {
types.Entity entity = 1;
uint64 subscription_id = 2;
Expand All @@ -96,6 +107,13 @@ message RetrieveEntitiesRequest {
types.Query query = 1;
}

message RetrieveEventMessagesRequest {
// The event messages to retrieve
types.Query query = 1;
// Should we retrieve historical event messages?
bool historical = 2;
}

message RetrieveEntitiesResponse {
repeated types.Entity entities = 1;
uint32 total_count = 2;
Expand Down
21 changes: 13 additions & 8 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::Endpoint;

use crate::proto::world::{
world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest,
RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest,
world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventMessagesRequest,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest,
UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query};
Expand Down Expand Up @@ -96,8 +97,9 @@ impl WorldClient {
pub async fn retrieve_event_messages(
&mut self,
query: Query,
historical: bool,
) -> Result<RetrieveEntitiesResponse, Error> {
let request = RetrieveEntitiesRequest { query: Some(query.into()) };
let request = RetrieveEventMessagesRequest { query: Some(query.into()), historical };
self.inner
.retrieve_event_messages(request)
.await
Expand Down Expand Up @@ -172,11 +174,12 @@ impl WorldClient {
pub async fn subscribe_event_messages(
&mut self,
clauses: Vec<EntityKeysClause>,
historical: bool,
) -> Result<EntityUpdateStreaming, Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
let stream = self
.inner
.subscribe_event_messages(SubscribeEntitiesRequest { clauses })
.subscribe_event_messages(SubscribeEventMessagesRequest { clauses, historical })
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Expand All @@ -194,12 +197,14 @@ impl WorldClient {
&mut self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
historical: bool,
) -> Result<(), Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
self.inner
.update_event_messages_subscription(UpdateEntitiesSubscriptionRequest {
.update_event_messages_subscription(UpdateEventMessagesSubscriptionRequest {
subscription_id,
clauses,
historical,
})
.await
.map_err(Error::Grpc)
Expand Down
Loading

0 comments on commit 5d7f632

Please sign in to comment.