diff --git a/Cargo.lock b/Cargo.lock index 4fef80497b..104e8e2349 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5931,6 +5931,7 @@ dependencies = [ "particle-modules", "particle-protocol", "particle-services", + "peer-metrics", "serde", "serde_json", "server-config", @@ -5963,6 +5964,7 @@ dependencies = [ "particle-execution", "particle-protocol", "particle-services", + "peer-metrics", "serde", "serde_json", "thiserror", diff --git a/crates/peer-metrics/src/lib.rs b/crates/peer-metrics/src/lib.rs index b0077df403..bec8d31937 100644 --- a/crates/peer-metrics/src/lib.rs +++ b/crates/peer-metrics/src/lib.rs @@ -13,6 +13,7 @@ pub use services_metrics::{ ServiceCallStats, ServiceMemoryStat, ServiceType, ServicesMetrics, ServicesMetricsBackend, ServicesMetricsBuiltin, ServicesMetricsExternal, }; +pub use spell_metrics::SpellMetrics; pub use vm_pool::VmPoolMetrics; mod connection_pool; @@ -22,6 +23,7 @@ mod info; mod network_protocol; mod particle_executor; mod services_metrics; +mod spell_metrics; mod vm_pool; // TODO: diff --git a/crates/peer-metrics/src/spell_metrics.rs b/crates/peer-metrics/src/spell_metrics.rs new file mode 100644 index 0000000000..48442cc376 --- /dev/null +++ b/crates/peer-metrics/src/spell_metrics.rs @@ -0,0 +1,80 @@ +use crate::register; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::metrics::histogram::Histogram; +use prometheus_client::registry::Registry; + +#[derive(Clone)] +pub struct SpellMetrics { + // How much spell _particles_ were created by the node + spell_particles_created: Counter, + // How much spells are scheduled to run _now_ + spell_scheduled_now: Gauge, + // Distribution of spell's scheduled periods + spell_periods: Histogram, +} + +impl SpellMetrics { + pub fn new(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("spell"); + + let spell_particles_created = register( + sub_registry, + Counter::default(), + "particles_created", + "Number of spell particles created", + ); + + let spell_scheduled_now = register( + sub_registry, + Gauge::default(), + "scheduled_now", + "Number of spell particles scheduled to run now", + ); + + let spell_periods = register( + sub_registry, + Histogram::new(Self::periods_buckets()), + "periods", + "Spell particle periods", + ); + + Self { + spell_particles_created, + spell_scheduled_now, + spell_periods, + } + } + + fn periods_buckets() -> std::vec::IntoIter { + // 0.0 sec, 1 sec, 30 sec, 1 min, 5 min, 10 min, 1 hour, 12 hours, 1 day, 1 week, 1 month + // 0 means that the spell is oneshot or reacts only on events + vec![ + 0.0, + 1.0, + 30.0, + 60.0, + 60.0 * 5.0, + 60.0 * 10.0, + 60.0 * 60.0, + 60.0 * 60.0 * 12.0, + 60.0 * 60.0 * 24.0, + 60.0 * 60.0 * 24.0 * 7.0, + 60.0 * 60.0 * 24.0 * 30.0, + ] + .into_iter() + } + + pub fn observe_started_spell(&self, period: u32) { + self.spell_scheduled_now.inc(); + self.spell_periods.observe(period as f64) + } + + pub fn observe_finished_spell(&self) { + self.spell_scheduled_now.dec(); + } + + pub fn observe_spell_cast(&self) { + self.spell_particles_created.inc(); + } +} diff --git a/crates/spell-event-bus/Cargo.toml b/crates/spell-event-bus/Cargo.toml index c82bc18819..9d7c10504a 100644 --- a/crates/spell-event-bus/Cargo.toml +++ b/crates/spell-event-bus/Cargo.toml @@ -26,6 +26,7 @@ futures = { workspace = true } thiserror = { workspace = true } log = { workspace = true } fluence-spell-dtos = { workspace = true } +peer-metrics = { workspace = true } [dev-dependencies] libp2p = { workspace = true } diff --git a/crates/spell-event-bus/src/bus.rs b/crates/spell-event-bus/src/bus.rs index 04b416865d..8c198c414c 100644 --- a/crates/spell-event-bus/src/bus.rs +++ b/crates/spell-event-bus/src/bus.rs @@ -3,6 +3,7 @@ use crate::config::{SpellTriggerConfigs, TriggerConfig}; use futures::stream::BoxStream; use futures::StreamExt; use futures::{future, FutureExt}; +use peer_metrics::SpellMetrics; use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap}; use std::pin::Pin; @@ -162,10 +163,13 @@ pub struct SpellEventBus { recv_cmd_channel: mpsc::UnboundedReceiver, /// Notify when trigger happened send_events: mpsc::UnboundedSender, + /// Spell metrics + spell_metrics: Option, } impl SpellEventBus { pub fn new( + spell_metrics: Option, sources: Vec>, ) -> ( Self, @@ -181,6 +185,7 @@ impl SpellEventBus { sources, recv_cmd_channel, send_events, + spell_metrics, }; (this, api, recv_events) } @@ -258,6 +263,8 @@ impl SpellEventBus { if let Some(rescheduled) = Scheduled::at(scheduled_spell.data, Instant::now()) { log::trace!("Reschedule: {:?}", rescheduled); state.scheduled.push(rescheduled); + } else if let Some(m) = &self.spell_metrics { + m.observe_finished_spell(); } } }, @@ -381,7 +388,7 @@ mod tests { #[tokio::test] async fn test_subscribe_one() { - let (bus, api, event_receiver) = SpellEventBus::new(vec![]); + let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); let event_stream = UnboundedReceiverStream::new(event_receiver); @@ -405,7 +412,7 @@ mod tests { #[tokio::test] async fn test_subscribe_many() { - let (bus, api, event_receiver) = SpellEventBus::new(vec![]); + let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); let event_stream = UnboundedReceiverStream::new(event_receiver); @@ -438,7 +445,7 @@ mod tests { #[tokio::test] async fn test_subscribe_oneshot() { - let (bus, api, event_receiver) = SpellEventBus::new(vec![]); + let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); let bus = bus.start(); let event_stream = UnboundedReceiverStream::new(event_receiver); let spell1_id = "spell1".to_string(); @@ -473,7 +480,7 @@ mod tests { async fn test_subscribe_connect() { let (send, recv) = mpsc::unbounded_channel(); let recv = UnboundedReceiverStream::new(recv).boxed(); - let (bus, api, event_receiver) = SpellEventBus::new(vec![recv]); + let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]); let mut event_stream = UnboundedReceiverStream::new(event_receiver); let bus = bus.start(); @@ -503,7 +510,7 @@ mod tests { async fn test_unsubscribe() { let (send, recv) = mpsc::unbounded_channel(); let recv = UnboundedReceiverStream::new(recv).boxed(); - let (bus, api, mut event_receiver) = SpellEventBus::new(vec![recv]); + let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![recv]); let bus = bus.start(); let spell1_id = "spell1".to_string(); @@ -532,7 +539,7 @@ mod tests { async fn test_subscribe_many_spells_with_diff_event_types() { let (recv, hdl) = emulate_connect(Duration::from_millis(10)); let recv = UnboundedReceiverStream::new(recv).boxed(); - let (bus, api, event_receiver) = SpellEventBus::new(vec![recv]); + let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]); let event_stream = UnboundedReceiverStream::new(event_receiver); let bus = bus.start(); diff --git a/particle-node/src/node.rs b/particle-node/src/node.rs index 78652b338e..226ef69bf8 100644 --- a/particle-node/src/node.rs +++ b/particle-node/src/node.rs @@ -43,7 +43,7 @@ use particle_execution::ParticleFunctionStatic; use particle_protocol::Particle; use peer_metrics::{ ConnectionPoolMetrics, ConnectivityMetrics, ParticleExecutorMetrics, ServicesMetrics, - ServicesMetricsBackend, VmPoolMetrics, + ServicesMetricsBackend, SpellMetrics, VmPoolMetrics, }; use prometheus_client::registry::Registry; use script_storage::{ScriptStorageApi, ScriptStorageBackend, ScriptStorageConfig}; @@ -136,6 +136,7 @@ impl Node { let connection_pool_metrics = metrics_registry.as_mut().map(ConnectionPoolMetrics::new); let plumber_metrics = metrics_registry.as_mut().map(ParticleExecutorMetrics::new); let vm_pool_metrics = metrics_registry.as_mut().map(VmPoolMetrics::new); + let spell_metrics = metrics_registry.as_mut().map(SpellMetrics::new); #[allow(deprecated)] let connection_limits = ConnectionLimits::default() @@ -251,7 +252,7 @@ impl Node { let sources = vec![recv_connection_pool_events.map(PeerEvent::from).boxed()]; let (spell_event_bus, spell_event_bus_api, spell_events_receiver) = - SpellEventBus::new(sources); + SpellEventBus::new(spell_metrics.clone(), sources); let (sorcerer, mut custom_service_functions, spell_version) = Sorcerer::new( builtins.services.clone(), @@ -260,6 +261,7 @@ impl Node { config.clone(), spell_event_bus_api, key_manager.clone(), + spell_metrics, ); let node_info = NodeInfo { diff --git a/sorcerer/Cargo.toml b/sorcerer/Cargo.toml index fb040c63d5..ff0fb678b3 100644 --- a/sorcerer/Cargo.toml +++ b/sorcerer/Cargo.toml @@ -22,6 +22,7 @@ connection-pool = { workspace = true } kademlia = { workspace = true } fluence-libp2p = { workspace = true } key-manager = { workspace = true } +peer-metrics = { workspace = true } libp2p = { workspace = true } fluence-keypair = { workspace = true } diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 93ed891cd4..81e9941500 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -152,6 +152,9 @@ impl Sorcerer { let particle = self.make_spell_particle(event.spell_id.clone(), worker_id)?; self.store_trigger(event.clone(), worker_id)?; + if let Some(m) = &self.spell_metrics { + m.observe_spell_cast(); + } self.aquamarine.clone().execute(particle, None).await?; }; diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index 01105264d7..ea177eb515 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -36,6 +36,7 @@ use particle_builtins::{wrap, wrap_unit, CustomService}; use particle_execution::ServiceFunction; use particle_modules::ModuleRepository; use particle_services::ParticleAppServices; +use peer_metrics::SpellMetrics; use serde_json::Value; use server_config::ResolvedConfig; use spell_event_bus::api::{from_user_config, SpellEventBusApi, TriggerEvent}; @@ -49,6 +50,7 @@ pub struct Sorcerer { pub spell_event_bus_api: SpellEventBusApi, pub spell_script_particle_ttl: Duration, pub key_manager: KeyManager, + pub spell_metrics: Option, } impl Sorcerer { @@ -59,6 +61,7 @@ impl Sorcerer { config: ResolvedConfig, spell_event_bus_api: SpellEventBusApi, key_manager: KeyManager, + spell_metrics: Option, ) -> (Self, HashMap, String) { let (spell_storage, spell_version) = SpellStorage::create(&config.dir_config.spell_base_dir, &services, &modules) @@ -71,6 +74,7 @@ impl Sorcerer { spell_event_bus_api, spell_script_particle_ttl: config.max_spell_particle_ttl, key_manager, + spell_metrics, }; let mut builtin_functions = sorcerer.make_spell_builtins(); @@ -104,11 +108,15 @@ impl Sorcerer { spell_id, "get_trigger_config", )?; + let period = result.config.clock.period_sec; let config = from_user_config(result.config)?; if let Some(config) = config.and_then(|c| c.into_rescheduled()) { self.spell_event_bus_api - .subscribe(spell_id.clone(), config.clone()) + .subscribe(spell_id.clone(), config) .await?; + if let Some(m) = &self.spell_metrics { + m.observe_started_spell(period); + } } else { log::warn!("Spell {spell_id} is not rescheduled since its config is either not found or not reschedulable"); }