Skip to content

Commit

Permalink
refactor: full torii clinet refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Oct 17, 2024
1 parent f3c9d7a commit 494307e
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 202 deletions.
49 changes: 43 additions & 6 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use std::sync::Arc;
use dojo_types::WorldMetadata;
use dojo_world::contracts::WorldContractReader;
use futures::lock::Mutex;
use futures::stream::MapOk;
use futures::{Stream, StreamExt, TryStreamExt};
use parking_lot::{RwLock, RwLockReadGuard};
use starknet::core::types::Felt;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::client::{EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse, SubscribeEntityResponse};
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, Query};
use torii_relay::client::EventLoop;
Expand Down Expand Up @@ -95,15 +97,17 @@ impl Client {
let mut grpc_client = self.inner.write().await;
let RetrieveEntitiesResponse { entities, total_count: _ } =
grpc_client.retrieve_entities(query).await?;
Ok(entities.into_iter().map(TryInto::try_into).collect::<Result<Vec<Entity>, _>>()?)
let models = &self.metadata().models;
Ok(entities.into_iter().map(|e| e.map(models)).collect::<Result<Vec<Entity>, _>>()?)
}

/// Similary to entities, this function retrieves event messages matching the query parameter.
pub async fn event_messages(&self, query: Query) -> Result<Vec<Entity>, Error> {
let mut grpc_client = self.inner.write().await;
let RetrieveEntitiesResponse { entities, total_count: _ } =
grpc_client.retrieve_event_messages(query).await?;
Ok(entities.into_iter().map(TryInto::try_into).collect::<Result<Vec<Entity>, _>>()?)
let models = &self.metadata().models;
Ok(entities.into_iter().map(|e| e.map(models)).collect::<Result<Vec<Entity>, _>>()?)
}

/// Retrieve raw starknet events matching the keys provided.
Expand All @@ -121,7 +125,14 @@ impl Client {
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(clauses).await?;
Ok(stream)
let models = self.metadata().models.clone();

Ok(EntityUpdateStreaming(stream.map_ok(Box::new(move |res| {
res.entity.map_or(
(res.subscription_id, Entity { hashed_keys: Felt::ZERO, models: vec![] }),
|entity| (res.subscription_id, entity.map(&models).expect("must able to serialize"))
)
}))))
}

/// Update the entities subscription
Expand All @@ -142,7 +153,14 @@ impl Client {
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_event_messages(clauses).await?;
Ok(stream)
let models = self.metadata().models.clone();

Ok(EntityUpdateStreaming(stream.map_ok(Box::new(move |res| {
res.entity.map_or(
(res.subscription_id, Entity { hashed_keys: Felt::ZERO, models: vec![] }),
|entity| (res.subscription_id, entity.map(&models).expect("must able to serialize"))
)
}))))
}

/// Update the event messages subscription
Expand Down Expand Up @@ -179,3 +197,22 @@ impl Client {
Ok(stream)
}
}

type SubscriptionId = u64;
type EntityMappedStream = MapOk<
tonic::Streaming<SubscribeEntityResponse>,
Box<dyn Fn(SubscribeEntityResponse) -> (SubscriptionId, Entity) + Send>,
>;

#[derive(Debug)]
pub struct EntityUpdateStreaming(EntityMappedStream);

impl Stream for EntityUpdateStreaming {
type Item = <EntityMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}
25 changes: 4 additions & 21 deletions crates/torii/grpc/proto/schema.proto
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
syntax = "proto3";
package types;

message EnumOption {
string name = 1;
Ty ty = 2;
}

message Enum {
string name = 1;
uint32 option = 2;
repeated EnumOption options = 3;
uint32 option = 1;
Ty ty = 2;
}

message Primitive {
Expand All @@ -33,11 +27,6 @@ message Primitive {
}
}

message Struct {
string name = 1;
repeated Member children = 2;
}

message Array {
repeated Ty children = 1;
}
Expand All @@ -46,15 +35,9 @@ message Ty {
oneof ty_type {
Primitive primitive = 2;
Enum enum = 3;
Struct struct = 4;
Array struct = 4;
Array tuple = 5;
Array array = 6;
string bytearray = 7;
}
}

message Member {
string name = 1;
Ty ty = 2;
bool key = 3;
}
}
7 changes: 6 additions & 1 deletion crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ message ModelMetadata {
string contract_address = 8;
}

message Model {
string name = 1;
repeated Ty children = 2;
}

message Entity {
// The entity's hashed keys
bytes hashed_keys = 1;
// Models of the entity
repeated Struct models = 2;
repeated Model models = 2;
}

message Event {
Expand Down
43 changes: 8 additions & 35 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate};
use tonic::codec::CompressionEncoding;
#[cfg(not(target_arch = "wasm32"))]
use tonic::transport::Endpoint;
use tonic::Streaming;

use crate::proto::world::{
world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest,
RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest,
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest
};
use crate::types::schema::{Entity, SchemaError};

use crate::types::schema::SchemaError;
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query,
};
Expand Down Expand Up @@ -135,7 +137,7 @@ impl WorldClient {
pub async fn subscribe_entities(
&mut self,
clauses: Vec<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
) -> Result<Streaming<SubscribeEntityResponse>, Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
let stream = self
.inner
Expand All @@ -144,12 +146,7 @@ impl WorldClient {
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;

Ok(EntityUpdateStreaming(stream.map_ok(Box::new(|res| {
res.entity.map_or(
(res.subscription_id, Entity { hashed_keys: Felt::ZERO, models: vec![] }),
|entity| (res.subscription_id, entity.try_into().expect("must able to serialize")),
)
}))))
Ok(stream)
}

/// Update an entities subscription.
Expand All @@ -174,7 +171,7 @@ impl WorldClient {
pub async fn subscribe_event_messages(
&mut self,
clauses: Vec<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
) -> Result<Streaming<SubscribeEntityResponse>, Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
let stream = self
.inner
Expand All @@ -183,12 +180,7 @@ impl WorldClient {
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;

Ok(EntityUpdateStreaming(stream.map_ok(Box::new(|res| {
res.entity.map_or(
(res.subscription_id, Entity { hashed_keys: Felt::ZERO, models: vec![] }),
|entity| (res.subscription_id, entity.try_into().expect("must able to serialize")),
)
}))))
Ok(stream)
}

/// Update an event messages subscription.
Expand Down Expand Up @@ -269,25 +261,6 @@ impl Stream for ModelDiffsStreaming {
}
}

type SubscriptionId = u64;
type EntityMappedStream = MapOk<
tonic::Streaming<SubscribeEntityResponse>,
Box<dyn Fn(SubscribeEntityResponse) -> (SubscriptionId, Entity) + Send>,
>;

#[derive(Debug)]
pub struct EntityUpdateStreaming(EntityMappedStream);

impl Stream for EntityUpdateStreaming {
type Item = <EntityMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

type EventMappedStream = MapOk<
tonic::Streaming<SubscribeEventsResponse>,
Box<dyn Fn(SubscribeEventsResponse) -> Event + Send>,
Expand Down
Loading

0 comments on commit 494307e

Please sign in to comment.