Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add additional spell metrics [fixes NET-437] #1569

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/peer-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use services_metrics::{
ServiceCallStats, ServiceMemoryStat, ServiceType, ServicesMetrics, ServicesMetricsBackend,
ServicesMetricsBuiltin, ServicesMetricsExternal,
};
pub use spell_metrics::SpellMetrics;
pub use vm_pool::VmPoolMetrics;

mod connection_pool;
Expand All @@ -22,6 +23,7 @@ mod info;
mod network_protocol;
mod particle_executor;
mod services_metrics;
mod spell_metrics;
mod vm_pool;

// TODO:
Expand Down
80 changes: 80 additions & 0 deletions crates/peer-metrics/src/spell_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::register;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;

#[derive(Clone)]
pub struct SpellMetrics {
// How much spell _particles_ were created by the node
spell_particles_created: Counter,
// How much spells are scheduled to run _now_
spell_scheduled_now: Gauge,
// Distribution of spell's scheduled periods
spell_periods: Histogram,
}

impl SpellMetrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("spell");

let spell_particles_created = register(
sub_registry,
Counter::default(),
"particles_created",
"Number of spell particles created",
);

let spell_scheduled_now = register(
sub_registry,
Gauge::default(),
"scheduled_now",
"Number of spell particles scheduled to run now",
);

let spell_periods = register(
sub_registry,
Histogram::new(Self::periods_buckets()),
"periods",
"Spell particle periods",
);

Self {
spell_particles_created,
spell_scheduled_now,
spell_periods,
}
}

fn periods_buckets() -> std::vec::IntoIter<f64> {
// 0.0 sec, 1 sec, 30 sec, 1 min, 5 min, 10 min, 1 hour, 12 hours, 1 day, 1 week, 1 month
// 0 means that the spell is oneshot or reacts only on events
vec![
0.0,
1.0,
30.0,
60.0,
60.0 * 5.0,
60.0 * 10.0,
60.0 * 60.0,
60.0 * 60.0 * 12.0,
60.0 * 60.0 * 24.0,
60.0 * 60.0 * 24.0 * 7.0,
60.0 * 60.0 * 24.0 * 30.0,
]
.into_iter()
}

pub fn observe_started_spell(&self, period: u32) {
self.spell_scheduled_now.inc();
self.spell_periods.observe(period as f64)
}

pub fn observe_finished_spell(&self) {
self.spell_scheduled_now.dec();
}

pub fn observe_spell_cast(&self) {
self.spell_particles_created.inc();
}
}
1 change: 1 addition & 0 deletions crates/spell-event-bus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ futures = { workspace = true }
thiserror = { workspace = true }
log = { workspace = true }
fluence-spell-dtos = { workspace = true }
peer-metrics = { workspace = true }

[dev-dependencies]
libp2p = { workspace = true }
Expand Down
19 changes: 13 additions & 6 deletions crates/spell-event-bus/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::config::{SpellTriggerConfigs, TriggerConfig};
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::{future, FutureExt};
use peer_metrics::SpellMetrics;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::pin::Pin;
Expand Down Expand Up @@ -162,10 +163,13 @@ pub struct SpellEventBus {
recv_cmd_channel: mpsc::UnboundedReceiver<Command>,
/// Notify when trigger happened
send_events: mpsc::UnboundedSender<TriggerEvent>,
/// Spell metrics
spell_metrics: Option<SpellMetrics>,
}

impl SpellEventBus {
pub fn new(
spell_metrics: Option<SpellMetrics>,
sources: Vec<BoxStream<'static, PeerEvent>>,
) -> (
Self,
Expand All @@ -181,6 +185,7 @@ impl SpellEventBus {
sources,
recv_cmd_channel,
send_events,
spell_metrics,
};
(this, api, recv_events)
}
Expand Down Expand Up @@ -258,6 +263,8 @@ impl SpellEventBus {
if let Some(rescheduled) = Scheduled::at(scheduled_spell.data, Instant::now()) {
log::trace!("Reschedule: {:?}", rescheduled);
state.scheduled.push(rescheduled);
} else if let Some(m) = &self.spell_metrics {
m.observe_finished_spell();
}
}
},
Expand Down Expand Up @@ -381,7 +388,7 @@ mod tests {

#[tokio::test]
async fn test_subscribe_one() {
let (bus, api, event_receiver) = SpellEventBus::new(vec![]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let event_stream = UnboundedReceiverStream::new(event_receiver);

Expand All @@ -405,7 +412,7 @@ mod tests {

#[tokio::test]
async fn test_subscribe_many() {
let (bus, api, event_receiver) = SpellEventBus::new(vec![]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let event_stream = UnboundedReceiverStream::new(event_receiver);

Expand Down Expand Up @@ -438,7 +445,7 @@ mod tests {

#[tokio::test]
async fn test_subscribe_oneshot() {
let (bus, api, event_receiver) = SpellEventBus::new(vec![]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let event_stream = UnboundedReceiverStream::new(event_receiver);
let spell1_id = "spell1".to_string();
Expand Down Expand Up @@ -473,7 +480,7 @@ mod tests {
async fn test_subscribe_connect() {
let (send, recv) = mpsc::unbounded_channel();
let recv = UnboundedReceiverStream::new(recv).boxed();
let (bus, api, event_receiver) = SpellEventBus::new(vec![recv]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]);
let mut event_stream = UnboundedReceiverStream::new(event_receiver);
let bus = bus.start();

Expand Down Expand Up @@ -503,7 +510,7 @@ mod tests {
async fn test_unsubscribe() {
let (send, recv) = mpsc::unbounded_channel();
let recv = UnboundedReceiverStream::new(recv).boxed();
let (bus, api, mut event_receiver) = SpellEventBus::new(vec![recv]);
let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![recv]);
let bus = bus.start();

let spell1_id = "spell1".to_string();
Expand Down Expand Up @@ -532,7 +539,7 @@ mod tests {
async fn test_subscribe_many_spells_with_diff_event_types() {
let (recv, hdl) = emulate_connect(Duration::from_millis(10));
let recv = UnboundedReceiverStream::new(recv).boxed();
let (bus, api, event_receiver) = SpellEventBus::new(vec![recv]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]);
let event_stream = UnboundedReceiverStream::new(event_receiver);
let bus = bus.start();

Expand Down
6 changes: 4 additions & 2 deletions particle-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use particle_execution::ParticleFunctionStatic;
use particle_protocol::Particle;
use peer_metrics::{
ConnectionPoolMetrics, ConnectivityMetrics, ParticleExecutorMetrics, ServicesMetrics,
ServicesMetricsBackend, VmPoolMetrics,
ServicesMetricsBackend, SpellMetrics, VmPoolMetrics,
};
use prometheus_client::registry::Registry;
use script_storage::{ScriptStorageApi, ScriptStorageBackend, ScriptStorageConfig};
Expand Down Expand Up @@ -136,6 +136,7 @@ impl<RT: AquaRuntime> Node<RT> {
let connection_pool_metrics = metrics_registry.as_mut().map(ConnectionPoolMetrics::new);
let plumber_metrics = metrics_registry.as_mut().map(ParticleExecutorMetrics::new);
let vm_pool_metrics = metrics_registry.as_mut().map(VmPoolMetrics::new);
let spell_metrics = metrics_registry.as_mut().map(SpellMetrics::new);

#[allow(deprecated)]
let connection_limits = ConnectionLimits::default()
Expand Down Expand Up @@ -251,7 +252,7 @@ impl<RT: AquaRuntime> Node<RT> {
let sources = vec![recv_connection_pool_events.map(PeerEvent::from).boxed()];

let (spell_event_bus, spell_event_bus_api, spell_events_receiver) =
SpellEventBus::new(sources);
SpellEventBus::new(spell_metrics.clone(), sources);

let (sorcerer, mut custom_service_functions, spell_version) = Sorcerer::new(
builtins.services.clone(),
Expand All @@ -260,6 +261,7 @@ impl<RT: AquaRuntime> Node<RT> {
config.clone(),
spell_event_bus_api,
key_manager.clone(),
spell_metrics,
);

let node_info = NodeInfo {
Expand Down
1 change: 1 addition & 0 deletions sorcerer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ connection-pool = { workspace = true }
kademlia = { workspace = true }
fluence-libp2p = { workspace = true }
key-manager = { workspace = true }
peer-metrics = { workspace = true }

libp2p = { workspace = true }
fluence-keypair = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions sorcerer/src/script_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ impl Sorcerer {
let particle = self.make_spell_particle(event.spell_id.clone(), worker_id)?;

self.store_trigger(event.clone(), worker_id)?;
if let Some(m) = &self.spell_metrics {
m.observe_spell_cast();
}
self.aquamarine.clone().execute(particle, None).await?;
};

Expand Down
10 changes: 9 additions & 1 deletion sorcerer/src/sorcerer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use particle_builtins::{wrap, wrap_unit, CustomService};
use particle_execution::ServiceFunction;
use particle_modules::ModuleRepository;
use particle_services::ParticleAppServices;
use peer_metrics::SpellMetrics;
use serde_json::Value;
use server_config::ResolvedConfig;
use spell_event_bus::api::{from_user_config, SpellEventBusApi, TriggerEvent};
Expand All @@ -49,6 +50,7 @@ pub struct Sorcerer {
pub spell_event_bus_api: SpellEventBusApi,
pub spell_script_particle_ttl: Duration,
pub key_manager: KeyManager,
pub spell_metrics: Option<SpellMetrics>,
}

impl Sorcerer {
Expand All @@ -59,6 +61,7 @@ impl Sorcerer {
config: ResolvedConfig,
spell_event_bus_api: SpellEventBusApi,
key_manager: KeyManager,
spell_metrics: Option<SpellMetrics>,
) -> (Self, HashMap<String, CustomService>, String) {
let (spell_storage, spell_version) =
SpellStorage::create(&config.dir_config.spell_base_dir, &services, &modules)
Expand All @@ -71,6 +74,7 @@ impl Sorcerer {
spell_event_bus_api,
spell_script_particle_ttl: config.max_spell_particle_ttl,
key_manager,
spell_metrics,
};

let mut builtin_functions = sorcerer.make_spell_builtins();
Expand Down Expand Up @@ -104,11 +108,15 @@ impl Sorcerer {
spell_id,
"get_trigger_config",
)?;
let period = result.config.clock.period_sec;
let config = from_user_config(result.config)?;
if let Some(config) = config.and_then(|c| c.into_rescheduled()) {
self.spell_event_bus_api
.subscribe(spell_id.clone(), config.clone())
.subscribe(spell_id.clone(), config)
.await?;
if let Some(m) = &self.spell_metrics {
m.observe_started_spell(period);
}
} else {
log::warn!("Spell {spell_id} is not rescheduled since its config is either not found or not reschedulable");
}
Expand Down