From 16743e2fa9f8bbda8d0014aece3a1e5714731d96 Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Thu, 27 Apr 2023 20:47:43 +0200 Subject: [PATCH 1/4] start resubscribed spells scheduling after full node initialization --- crates/spell-event-bus/src/api.rs | 33 +++++++++++---------- crates/spell-event-bus/src/bus.rs | 48 +++++++++++++++++++++++++------ particle-node/src/node.rs | 13 +++++++-- 3 files changed, 67 insertions(+), 27 deletions(-) diff --git a/crates/spell-event-bus/src/api.rs b/crates/spell-event-bus/src/api.rs index 05eff97b90..7924976b31 100644 --- a/crates/spell-event-bus/src/api.rs +++ b/crates/spell-event-bus/src/api.rs @@ -104,7 +104,6 @@ impl From for TriggerInfo { #[derive(Debug)] pub(crate) struct Command { - pub(crate) spell_id: SpellId, pub(crate) action: Action, pub(crate) reply: oneshot::Sender<()>, } @@ -112,23 +111,22 @@ pub(crate) struct Command { #[derive(Debug, Clone)] pub enum Action { /// Subscribe a spell to a list of triggers - Subscribe(SpellTriggerConfigs), + Subscribe(SpellId, SpellTriggerConfigs), /// Remove all subscriptions of a spell - Unsubscribe, + Unsubscribe(SpellId), + /// Actually run the scheduling + Run, } #[derive(Error, Debug)] pub enum EventBusError { - #[error( - "can't send a command `{action:?}` for spell `{spell_id}` to spell-event-bus: {reason}" - )] + #[error("can't send a command `{action:?}` to spell-event-bus: {reason}")] SendError { - spell_id: SpellId, action: Action, reason: Pin>, }, - #[error("can't receive a message from the bus on behalf of spell {0}: sending end is probably dropped")] - ReplyError(SpellId), + #[error("can't receive a message from the bus on behalf of a command {0:?}: sending end is probably dropped")] + ReplyError(Action), } #[derive(Clone)] @@ -143,23 +141,20 @@ impl std::fmt::Debug for SpellEventBusApi { } impl SpellEventBusApi { - async fn send(&self, spell_id: SpellId, action: Action) -> Result<(), EventBusError> { + async fn send(&self, action: Action) -> Result<(), EventBusError> { let (send, recv) = oneshot::channel(); let command = Command { - spell_id: spell_id.clone(), action: action.clone(), reply: send, }; self.send_cmd_channel .send(command) .map_err(|e| EventBusError::SendError { - spell_id: spell_id.clone(), - action, + action: action.clone(), reason: Box::pin(e), })?; - recv.await - .map_err(|_| EventBusError::ReplyError(spell_id))?; + recv.await.map_err(|_| EventBusError::ReplyError(action))?; Ok(()) } @@ -171,11 +166,15 @@ impl SpellEventBusApi { spell_id: SpellId, config: SpellTriggerConfigs, ) -> Result<(), EventBusError> { - self.send(spell_id, Action::Subscribe(config)).await + self.send(Action::Subscribe(spell_id, config)).await } /// Unsubscribe a spell from all events. pub async fn unsubscribe(&self, spell_id: SpellId) -> Result<(), EventBusError> { - self.send(spell_id, Action::Unsubscribe).await + self.send(Action::Unsubscribe(spell_id)).await + } + + pub async fn run_scheduling(&self) -> Result<(), EventBusError> { + self.send(Action::Run).await } } diff --git a/crates/spell-event-bus/src/bus.rs b/crates/spell-event-bus/src/bus.rs index 8c198c414c..894c858148 100644 --- a/crates/spell-event-bus/src/bus.rs +++ b/crates/spell-event-bus/src/bus.rs @@ -150,8 +150,10 @@ impl SubscribersState { #[derive(Debug, Error)] enum BusInternalError { // oneshot::Sender doesn't provide the reasons why it failed to send a message - #[error("failed to send a result of a command execution ({1:?}) for a spell {0}: receiving end probably dropped")] - Reply(SpellId, Action), + #[error( + "failed to send a result of a command execution ({0:?}): receiving end probably dropped" + )] + Reply(Action), #[error("failed to send notification about a peer event {1:?} to spell {0}: {2}")] SendEvent(SpellId, TriggerInfo, Pin>), } @@ -192,7 +194,7 @@ impl SpellEventBus { pub fn start(self) -> task::JoinHandle<()> { task::Builder::new() - .name("Bus") + .name("spell-bus") .spawn(self.run()) .expect("Could not spawn task") } @@ -208,6 +210,33 @@ impl SpellEventBus { let mut sources_channel = futures::stream::select_all(sources); let mut state = SubscribersState::new(); + loop { + let result: Result<(), BusInternalError> = try { + select! { + Some(command) = self.recv_cmd_channel.recv() => { + let Command { action, reply } = command; + match &action { + Action::Subscribe(spell_id, config) => { + state.subscribe(spell_id.clone(), config).unwrap_or(()); + }, + Action::Unsubscribe(spell_id) => { + state.unsubscribe(&spell_id); + }, + Action::Run => { + break; + } + }; + reply.send(()).map_err(|_| { + BusInternalError::Reply(action) + })?; + } + } + }; + if let Err(e) = result { + log::warn!("Error in spell event bus loop: {}", e); + } + } + loop { let now = Instant::now(); @@ -234,17 +263,20 @@ impl SpellEventBus { let result: Result<(), BusInternalError> = try { select! { Some(command) = self.recv_cmd_channel.recv() => { - let Command { spell_id, action, reply } = command; + let Command { action, reply } = command; match &action { - Action::Subscribe(config) => { + Action::Subscribe(spell_id, config) => { state.subscribe(spell_id.clone(), config).unwrap_or(()); }, - Action::Unsubscribe => { + Action::Unsubscribe(spell_id) => { state.unsubscribe(&spell_id); }, + Action::Run => { + log::error!("Can't run spell bus twice"); + } }; reply.send(()).map_err(|_| { - BusInternalError::Reply(spell_id, action) + BusInternalError::Reply(action) })?; }, Some(event) = sources_channel.next() => { @@ -264,7 +296,7 @@ impl SpellEventBus { log::trace!("Reschedule: {:?}", rescheduled); state.scheduled.push(rescheduled); } else if let Some(m) = &self.spell_metrics { - m.observe_finished_spell(); + m.observe_finished_spell(); } } }, diff --git a/particle-node/src/node.rs b/particle-node/src/node.rs index 226ef69bf8..54251cdb45 100644 --- a/particle-node/src/node.rs +++ b/particle-node/src/node.rs @@ -49,7 +49,7 @@ use prometheus_client::registry::Registry; use script_storage::{ScriptStorageApi, ScriptStorageBackend, ScriptStorageConfig}; use server_config::{NetworkConfig, ResolvedConfig, ServicesConfig}; use sorcerer::Sorcerer; -use spell_event_bus::api::{PeerEvent, TriggerEvent}; +use spell_event_bus::api::{PeerEvent, SpellEventBusApi, TriggerEvent}; use spell_event_bus::bus::SpellEventBus; use tokio::sync::{mpsc, oneshot}; use tokio::task; @@ -75,6 +75,7 @@ pub struct Node { aquavm_pool: AquamarineBackend>>, script_storage: ScriptStorageBackend, builtins_deployer: BuiltinsDeployer, + spell_event_bus_api: SpellEventBusApi, spell_event_bus: SpellEventBus, spell_events_receiver: mpsc::UnboundedReceiver, sorcerer: Sorcerer, @@ -259,7 +260,7 @@ impl Node { builtins.modules.clone(), aquamarine_api.clone(), config.clone(), - spell_event_bus_api, + spell_event_bus_api.clone(), key_manager.clone(), spell_metrics, ); @@ -308,6 +309,7 @@ impl Node { aquavm_pool, script_storage_backend, builtins_deployer, + spell_event_bus_api, spell_event_bus, spell_events_receiver, sorcerer, @@ -374,6 +376,7 @@ impl Node { aquavm_pool: AquamarineBackend>>, script_storage: ScriptStorageBackend, builtins_deployer: BuiltinsDeployer, + spell_event_bus_api: SpellEventBusApi, spell_event_bus: SpellEventBus, spell_events_receiver: mpsc::UnboundedReceiver, sorcerer: Sorcerer, @@ -395,6 +398,7 @@ impl Node { aquavm_pool, script_storage, builtins_deployer, + spell_event_bus_api, spell_event_bus, spell_events_receiver, sorcerer, @@ -489,6 +493,11 @@ impl Node { .await .wrap_err("builtins deploy failed")?; + let result = self.spell_event_bus_api.run_scheduling().await; + if let Err(e) = result { + log::error!("running spell event bus failed: {}", e); + } + Ok(exit_outlet) } From 0bf87d66268c092c48c2dd56e27f624524670424 Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Fri, 28 Apr 2023 15:54:06 +0200 Subject: [PATCH 2/4] refactor --- crates/spell-event-bus/src/api.rs | 8 +++--- crates/spell-event-bus/src/bus.rs | 42 ++++++++----------------------- particle-node/src/node.rs | 2 +- 3 files changed, 16 insertions(+), 36 deletions(-) diff --git a/crates/spell-event-bus/src/api.rs b/crates/spell-event-bus/src/api.rs index 7924976b31..ba5f652cd6 100644 --- a/crates/spell-event-bus/src/api.rs +++ b/crates/spell-event-bus/src/api.rs @@ -114,8 +114,8 @@ pub enum Action { Subscribe(SpellId, SpellTriggerConfigs), /// Remove all subscriptions of a spell Unsubscribe(SpellId), - /// Actually run the scheduling - Run, + /// Actually start the scheduling + Start, } #[derive(Error, Debug)] @@ -174,7 +174,7 @@ impl SpellEventBusApi { self.send(Action::Unsubscribe(spell_id)).await } - pub async fn run_scheduling(&self) -> Result<(), EventBusError> { - self.send(Action::Run).await + pub async fn start_scheduling(&self) -> Result<(), EventBusError> { + self.send(Action::Start).await } } diff --git a/crates/spell-event-bus/src/bus.rs b/crates/spell-event-bus/src/bus.rs index 894c858148..61e36a1a7d 100644 --- a/crates/spell-event-bus/src/bus.rs +++ b/crates/spell-event-bus/src/bus.rs @@ -210,33 +210,7 @@ impl SpellEventBus { let mut sources_channel = futures::stream::select_all(sources); let mut state = SubscribersState::new(); - loop { - let result: Result<(), BusInternalError> = try { - select! { - Some(command) = self.recv_cmd_channel.recv() => { - let Command { action, reply } = command; - match &action { - Action::Subscribe(spell_id, config) => { - state.subscribe(spell_id.clone(), config).unwrap_or(()); - }, - Action::Unsubscribe(spell_id) => { - state.unsubscribe(&spell_id); - }, - Action::Run => { - break; - } - }; - reply.send(()).map_err(|_| { - BusInternalError::Reply(action) - })?; - } - } - }; - if let Err(e) = result { - log::warn!("Error in spell event bus loop: {}", e); - } - } - + let mut is_started = false; loop { let now = Instant::now(); @@ -271,21 +245,21 @@ impl SpellEventBus { Action::Unsubscribe(spell_id) => { state.unsubscribe(&spell_id); }, - Action::Run => { - log::error!("Can't run spell bus twice"); + Action::Start => { + is_started = true; } }; reply.send(()).map_err(|_| { BusInternalError::Reply(action) })?; }, - Some(event) = sources_channel.next() => { + Some(event) = sources_channel.next(), if is_started => { for spell_id in state.subscribers(&event.get_type()) { let event = TriggerInfo::Peer(event.clone()); Self::trigger_spell(&send_events, spell_id, event)?; } }, - _ = timer_task => { + _ = timer_task, if is_started => { // The timer is triggered only if there are some spells to be awaken. if let Some(scheduled_spell) = state.scheduled.pop() { log::trace!("Execute: {:?}", scheduled_spell); @@ -422,6 +396,7 @@ mod tests { async fn test_subscribe_one() { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); + api.run_scheduler().await; let event_stream = UnboundedReceiverStream::new(event_receiver); let spell1_id = "spell1".to_string(); @@ -446,6 +421,7 @@ mod tests { async fn test_subscribe_many() { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); + api.run_scheduler().await; let event_stream = UnboundedReceiverStream::new(event_receiver); let mut spell_ids = hashmap![ @@ -479,6 +455,7 @@ mod tests { async fn test_subscribe_oneshot() { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); + api.run_scheduler().await; let event_stream = UnboundedReceiverStream::new(event_receiver); let spell1_id = "spell1".to_string(); subscribe_timer( @@ -515,6 +492,7 @@ mod tests { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]); let mut event_stream = UnboundedReceiverStream::new(event_receiver); let bus = bus.start(); + api.run_scheduler().await; let spell1_id = "spell1".to_string(); subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await; @@ -544,6 +522,7 @@ mod tests { let recv = UnboundedReceiverStream::new(recv).boxed(); let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![recv]); let bus = bus.start(); + api.run_scheduler().await; let spell1_id = "spell1".to_string(); subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await; @@ -574,6 +553,7 @@ mod tests { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]); let event_stream = UnboundedReceiverStream::new(event_receiver); let bus = bus.start(); + api.run_scheduler().await; let spell1_id = "spell1".to_string(); subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await; diff --git a/particle-node/src/node.rs b/particle-node/src/node.rs index 54251cdb45..0cb7160f27 100644 --- a/particle-node/src/node.rs +++ b/particle-node/src/node.rs @@ -493,7 +493,7 @@ impl Node { .await .wrap_err("builtins deploy failed")?; - let result = self.spell_event_bus_api.run_scheduling().await; + let result = self.spell_event_bus_api.start_scheduling().await; if let Err(e) = result { log::error!("running spell event bus failed: {}", e); } From 728941329dc2b61d02342e6bc4fa3ded22e27e7f Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Fri, 28 Apr 2023 16:36:15 +0200 Subject: [PATCH 3/4] fix tests --- crates/spell-event-bus/src/bus.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/spell-event-bus/src/bus.rs b/crates/spell-event-bus/src/bus.rs index 61e36a1a7d..94bf35223c 100644 --- a/crates/spell-event-bus/src/bus.rs +++ b/crates/spell-event-bus/src/bus.rs @@ -396,7 +396,7 @@ mod tests { async fn test_subscribe_one() { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); - api.run_scheduler().await; + let _ = api.start_scheduling().await; let event_stream = UnboundedReceiverStream::new(event_receiver); let spell1_id = "spell1".to_string(); @@ -421,7 +421,7 @@ mod tests { async fn test_subscribe_many() { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); - api.run_scheduler().await; + let _ = api.start_scheduling().await; let event_stream = UnboundedReceiverStream::new(event_receiver); let mut spell_ids = hashmap![ @@ -455,7 +455,7 @@ mod tests { async fn test_subscribe_oneshot() { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); - api.run_scheduler().await; + let _ = api.start_scheduling().await; let event_stream = UnboundedReceiverStream::new(event_receiver); let spell1_id = "spell1".to_string(); subscribe_timer( @@ -492,7 +492,7 @@ mod tests { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]); let mut event_stream = UnboundedReceiverStream::new(event_receiver); let bus = bus.start(); - api.run_scheduler().await; + let _ = api.start_scheduling().await; let spell1_id = "spell1".to_string(); subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await; @@ -522,7 +522,7 @@ mod tests { let recv = UnboundedReceiverStream::new(recv).boxed(); let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![recv]); let bus = bus.start(); - api.run_scheduler().await; + let _ = api.start_scheduling().await; let spell1_id = "spell1".to_string(); subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await; @@ -553,7 +553,7 @@ mod tests { let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]); let event_stream = UnboundedReceiverStream::new(event_receiver); let bus = bus.start(); - api.run_scheduler().await; + let _ = api.start_scheduling().await; let spell1_id = "spell1".to_string(); subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await; From 1e74d539c816cbfdfabd1d8972ad2d58aee5bee4 Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Fri, 28 Apr 2023 16:57:26 +0200 Subject: [PATCH 4/4] fix clippy --- crates/spell-event-bus/src/bus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/spell-event-bus/src/bus.rs b/crates/spell-event-bus/src/bus.rs index 94bf35223c..36f328531e 100644 --- a/crates/spell-event-bus/src/bus.rs +++ b/crates/spell-event-bus/src/bus.rs @@ -243,7 +243,7 @@ impl SpellEventBus { state.subscribe(spell_id.clone(), config).unwrap_or(()); }, Action::Unsubscribe(spell_id) => { - state.unsubscribe(&spell_id); + state.unsubscribe(spell_id); }, Action::Start => { is_started = true;