Skip to content

Commit

Permalink
refactor(torii-grpc): event subscription with multiple clauses
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Oct 18, 2024
1 parent 3508fb9 commit ba0510a
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 157 deletions.
2 changes: 1 addition & 1 deletion crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ message RetrieveEventsResponse {
}

message SubscribeEventsRequest {
types.KeysClause keys = 1;
repeated types.EntityKeysClause keys = 1;
}

message SubscribeEventsResponse {
Expand Down
6 changes: 4 additions & 2 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,9 +834,11 @@ impl DojoWorld {

async fn subscribe_events(
&self,
clause: proto::types::KeysClause,
clause: Vec<proto::types::EntityKeysClause>,
) -> Result<Receiver<Result<proto::world::SubscribeEventsResponse, tonic::Status>>, Error> {
self.event_manager.add_subscriber(clause.into()).await
self.event_manager
.add_subscriber(clause.into_iter().map(|keys| keys.into()).collect())
.await
}
}

Expand Down
65 changes: 3 additions & 62 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use torii_core::sql::FELT_DELIMITER;
use torii_core::types::OptimisticEntity;
use tracing::{error, trace};

use super::match_entity_keys;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::types::{EntityKeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";

Expand Down Expand Up @@ -128,67 +129,7 @@ impl Service {

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !sub.clauses.is_empty()
&& !sub.clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&hashed)
}
EntityKeysClause::Keys(clause) => {
// if we have a model clause, then we need to check that the entity
// has an updated model and that the model name matches the clause
if let Some(updated_model) = &entity.updated_model {
let name = updated_model.name();
let (namespace, name) = name.split_once('-').unwrap();

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
// if namespace is specified and model is empty or * we should
// match all models in the
// namespace if namespace
// and model are specified, we should match the
// specific model
(clause_namespace.is_empty()
|| clause_namespace == namespace
|| clause_namespace == "*")
&& (clause_model.is_empty()
|| clause_model == name
|| clause_model == "*")
})
{
return false;
}
}

// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
continue;
}

Expand Down
35 changes: 5 additions & 30 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Event;
use tracing::{error, trace};

use super::match_keys;
use crate::proto;
use crate::proto::world::SubscribeEventsResponse;
use crate::types::{KeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event";

#[derive(Debug)]
pub struct EventSubscriber {
/// Event keys that the subscriber is interested in
keys: KeysClause,
keys: Vec<EntityKeysClause>,
/// The channel to send the response back to the subscriber.
sender: Sender<Result<proto::world::SubscribeEventsResponse, tonic::Status>>,
}
Expand All @@ -41,7 +42,7 @@ pub struct EventManager {
impl EventManager {
pub async fn add_subscriber(
&self,
keys: KeysClause,
keys: Vec<EntityKeysClause>,
) -> Result<Receiver<Result<proto::world::SubscribeEventsResponse, tonic::Status>>, Error> {
let id = rand::thread_rng().gen::<usize>();
let (sender, receiver) = channel(1);
Expand Down Expand Up @@ -108,33 +109,7 @@ impl Service {
.map_err(ParseError::from)?;

for (idx, sub) in subs.subscribers.read().await.iter() {
// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if sub.keys.pattern_matching == PatternMatching::FixedLen
&& keys.len() != sub.keys.keys.len()
{
continue;
}

if !keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber key pattern
// in this case we might want to list all events with the same
// key selector so we can match them all
let sub_key = sub.keys.keys.get(idx);

// if we have a key in the subscriber, it must match the key in the event
// unless its empty, which is a wildcard
match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
}) {
if !match_keys(&keys, &sub.keys) {
continue;
}

Expand Down
65 changes: 3 additions & 62 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use torii_core::types::OptimisticEventMessage;
use tracing::{error, trace};

use super::entity::EntitiesSubscriber;
use super::match_entity_keys;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::types::{EntityKeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message";

Expand Down Expand Up @@ -120,67 +121,7 @@ impl Service {

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !sub.clauses.is_empty()
&& !sub.clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&hashed)
}
EntityKeysClause::Keys(clause) => {
// if we have a model clause, then we need to check that the entity
// has an updated model and that the model name matches the clause
if let Some(updated_model) = &entity.updated_model {
let name = updated_model.name();
let (namespace, name) = name.split_once('-').unwrap();

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
// if namespace is specified and model is empty or * we should
// match all models in the
// namespace if namespace
// and model are specified, we should match the
// specific model
(clause_namespace.is_empty()
|| clause_namespace == namespace
|| clause_namespace == "*")
&& (clause_model.is_empty()
|| clause_model == name
|| clause_model == "*")
})
{
return false;
}
}

// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
continue;
}

Expand Down
130 changes: 130 additions & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,136 @@
use dojo_types::schema::Ty;
use starknet_crypto::{poseidon_hash_many, Felt};

use crate::types::{EntityKeysClause, PatternMatching};

pub mod entity;
pub mod error;
pub mod event;
pub mod event_message;
pub mod indexer;
pub mod model_diff;

pub(crate) fn match_entity_keys(
id: Felt,
keys: &[Felt],
updated_model: &Option<Ty>,
clauses: &[EntityKeysClause],
) -> bool {
// Check if the subscriber is interested in this entity
// If we have a clause of hashed keys, then check that the id of the entity
// is in the list of hashed keys.

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !clauses.is_empty()
&& !clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&id)
}
EntityKeysClause::Keys(clause) => {
// if we have a model clause, then we need to check that the entity
// has an updated model and that the model name matches the clause
if let Some(updated_model) = &updated_model {
let name = updated_model.name();
let (namespace, name) = name.split_once('-').unwrap();

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
// if namespace is specified and model is empty or * we should
// match all models in the
// namespace if namespace
// and model are specified, we should match the
// specific model
(clause_namespace.is_empty()
|| clause_namespace == namespace
|| clause_namespace == "*")
&& (clause_model.is_empty()
|| clause_model == name
|| clause_model == "*")
})
{
return false;
}
}

// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
return false;
}

true
}

pub(crate) fn match_keys(keys: &[Felt], clauses: &[EntityKeysClause]) -> bool {
// Check if the subscriber is interested in this entity
// If we have a clause of hashed keys, then check that the id of the entity
// is in the list of hashed keys.

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !clauses.is_empty()
&& !clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&poseidon_hash_many(keys))
}
EntityKeysClause::Keys(clause) => {
// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
return false;
}

true
}

0 comments on commit ba0510a

Please sign in to comment.