From 79d8110785961b8769a6011f52f6d3754d22c7af Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 8 Oct 2024 13:45:09 -0400 Subject: [PATCH] refactor: subscription updates ordered --- .../grpc/src/server/subscriptions/entity.rs | 43 ++++++++++++------- .../grpc/src/server/subscriptions/event.rs | 36 +++++++++++----- .../src/server/subscriptions/event_message.rs | 41 ++++++++++++------ .../grpc/src/server/subscriptions/indexer.rs | 40 +++++++++++------ 4 files changed, 108 insertions(+), 52 deletions(-) diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 33b3b5f472..70b2ea63a6 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -9,7 +9,7 @@ use futures::Stream; use futures_util::StreamExt; use rand::Rng; use starknet::core::types::Felt; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{channel, Receiver, Sender, unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; @@ -74,23 +74,37 @@ impl EntityManager { } } -#[must_use = "Service does nothing unless polled"] -#[allow(missing_debug_implementations)] pub struct Service { - subs_manager: Arc, simple_broker: Pin + Send>>, + entity_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { - subs_manager, + let (entity_sender, entity_receiver) = unbounded_channel(); + let service = Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), - } + entity_sender, + }; + + tokio::spawn(Self::publish_updates(subs_manager, entity_receiver)); + + service } async fn publish_updates( subs: Arc, + mut entity_receiver: UnboundedReceiver, + ) { + while let Some(entity) = entity_receiver.recv().await { + if let Err(e) = Self::process_entity_update(&subs, &entity).await { + error!(target = LOG_TARGET, error = %e, "Processing entity update."); + } + } + } + + async fn process_entity_update( + subs: &Arc, entity: &OptimisticEntity, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); @@ -217,16 +231,13 @@ impl Service { impl Future for Service { type Output = (); - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { - let pin = self.get_mut(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); - while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { - let subs = Arc::clone(&pin.subs_manager); - tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, &entity).await { - error!(target = LOG_TARGET, error = %e, "Publishing entity update."); - } - }); + while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) { + if let Err(e) = this.entity_sender.send(entity) { + error!(target = LOG_TARGET, error = %e, "Sending entity update to processor."); + } } Poll::Pending diff --git a/crates/torii/grpc/src/server/subscriptions/event.rs b/crates/torii/grpc/src/server/subscriptions/event.rs index a00ddbbbdd..885029b6fa 100644 --- a/crates/torii/grpc/src/server/subscriptions/event.rs +++ b/crates/torii/grpc/src/server/subscriptions/event.rs @@ -9,7 +9,7 @@ use futures::Stream; use futures_util::StreamExt; use rand::Rng; use starknet::core::types::Felt; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{channel, Receiver, Sender, unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; @@ -62,16 +62,35 @@ impl EventManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - subs_manager: Arc, simple_broker: Pin + Send>>, + event_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + let (event_sender, event_receiver) = unbounded_channel(); + let service = Self { + simple_broker: Box::pin(SimpleBroker::::subscribe()), + event_sender, + }; + + tokio::spawn(Self::publish_updates(subs_manager, event_receiver)); + + service + } + + async fn publish_updates( + subs: Arc, + mut event_receiver: UnboundedReceiver, + ) { + while let Some(event) = event_receiver.recv().await { + if let Err(e) = Self::process_event(&subs, &event).await { + error!(target = LOG_TARGET, error = %e, "Processing event update."); + } + } } - async fn publish_updates(subs: Arc, event: &Event) -> Result<(), Error> { + async fn process_event(subs: &Arc, event: &Event) -> Result<(), Error> { let mut closed_stream = Vec::new(); let keys = event .keys @@ -151,12 +170,9 @@ impl Future for Service { let pin = self.get_mut(); while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { - let subs = Arc::clone(&pin.subs_manager); - tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, &event).await { - error!(target = LOG_TARGET, error = %e, "Publishing events update."); - } - }); + if let Err(e) = pin.event_sender.send(event) { + error!(target = LOG_TARGET, error = %e, "Sending event to processor."); + } } Poll::Pending diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index ab2611d1ac..bb1fea3438 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -9,7 +9,7 @@ use futures::Stream; use futures_util::StreamExt; use rand::Rng; use starknet::core::types::Felt; -use tokio::sync::mpsc::{channel, Receiver}; +use tokio::sync::mpsc::{channel, Receiver, unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; @@ -71,20 +71,36 @@ impl EventMessageManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - subs_manager: Arc, simple_broker: Pin + Send>>, + event_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { - subs_manager, + let (event_sender, event_receiver) = unbounded_channel(); + let service = Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), - } + event_sender, + }; + + tokio::spawn(Self::publish_updates(subs_manager, event_receiver)); + + service } async fn publish_updates( subs: Arc, + mut event_receiver: UnboundedReceiver, + ) { + while let Some(event) = event_receiver.recv().await { + if let Err(e) = Self::process_event_update(&subs, &event).await { + error!(target = LOG_TARGET, error = %e, "Processing event update."); + } + } + } + + async fn process_event_update( + subs: &Arc, entity: &OptimisticEventMessage, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); @@ -195,16 +211,13 @@ impl Service { impl Future for Service { type Output = (); - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { - let pin = self.get_mut(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); - while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { - let subs = Arc::clone(&pin.subs_manager); - tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, &entity).await { - error!(target = LOG_TARGET, error = %e, "Publishing entity update."); - } - }); + while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) { + if let Err(e) = this.event_sender.send(event) { + error!(target = LOG_TARGET, error = %e, "Sending event update to processor."); + } } Poll::Pending diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs index 27315b6766..b49ac225f2 100644 --- a/crates/torii/grpc/src/server/subscriptions/indexer.rs +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -9,7 +9,7 @@ use futures::{Stream, StreamExt}; use rand::Rng; use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{channel, Receiver, Sender, unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; @@ -81,17 +81,36 @@ impl IndexerManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - subs_manager: Arc, simple_broker: Pin + Send>>, + update_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + let (update_sender, update_receiver) = unbounded_channel(); + let service = Self { + simple_broker: Box::pin(SimpleBroker::::subscribe()), + update_sender, + }; + + tokio::spawn(Self::publish_updates(subs_manager, update_receiver)); + + service } async fn publish_updates( subs: Arc, + mut update_receiver: UnboundedReceiver, + ) { + while let Some(update) = update_receiver.recv().await { + if let Err(e) = Self::process_update(&subs, &update).await { + error!(target = LOG_TARGET, error = %e, "Processing indexer update."); + } + } + } + + async fn process_update( + subs: &Arc, update: &ContractUpdated, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); @@ -127,16 +146,13 @@ impl Service { impl Future for Service { type Output = (); - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { - let pin = self.get_mut(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); - while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { - let subs = Arc::clone(&pin.subs_manager); - tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, &event).await { - error!(target = LOG_TARGET, error = %e, "Publishing indexer update."); - } - }); + while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) { + if let Err(e) = this.update_sender.send(update) { + error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor."); + } } Poll::Pending