From 1d443a16e75607de569d7ba3ceb7e9081f1ee75f Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Tue, 4 Apr 2023 09:51:57 +0200 Subject: [PATCH 1/2] collect metrics for spell particles separately --- connection-pool/src/behaviour.rs | 8 ++++--- crates/peer-metrics/src/connection_pool.rs | 22 ++++++++++++++---- crates/peer-metrics/src/connectivity.rs | 26 ++++++++++++++++++---- crates/peer-metrics/src/dispatcher.rs | 14 ++++++++++-- crates/peer-metrics/src/lib.rs | 23 ++++++++++++++++++- particle-node/src/connectivity.rs | 4 ++-- particle-node/src/dispatcher.rs | 2 +- 7 files changed, 82 insertions(+), 17 deletions(-) diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index d96b098bb3..730e0ab3c4 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -566,9 +566,11 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { HandlerMessage::InParticle(particle) => { log::trace!(target: "network", "{}: received particle {} from {}; queue {}", self.peer_id, particle.id, from, self.queue.len()); self.meter(|m| { - m.particle_queue_size.set(self.queue.len() as i64 + 1); - m.received_particles.inc(); - m.particle_sizes.observe(particle.data.len() as f64); + m.incoming_particle( + &particle.id, + self.queue.len() as i64 + 1, + particle.data.len() as f64, + ) }); self.queue.push_back(particle); self.wake(); diff --git a/crates/peer-metrics/src/connection_pool.rs b/crates/peer-metrics/src/connection_pool.rs index bbeaf68be9..34b0bf9369 100644 --- a/crates/peer-metrics/src/connection_pool.rs +++ b/crates/peer-metrics/src/connection_pool.rs @@ -1,12 +1,14 @@ +use crate::{ParticleLabel, ParticleType}; use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; use prometheus_client::registry::Registry; #[derive(Clone)] pub struct ConnectionPoolMetrics { - pub received_particles: Counter, - pub particle_sizes: Histogram, + pub received_particles: Family, + pub particle_sizes: Family, pub connected_peers: Gauge, pub particle_queue_size: Gauge, } @@ -15,7 +17,7 @@ impl ConnectionPoolMetrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("connection_pool"); - let received_particles = Counter::default(); + let received_particles = Family::default(); sub_registry.register( "received_particles", "Number of particles received from the network (not unique)", @@ -23,7 +25,8 @@ impl ConnectionPoolMetrics { ); // from 100 bytes to 100 MB - let particle_sizes = Histogram::new(exponential_buckets(100.0, 10.0, 7)); + let particle_sizes: Family<_, _> = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(100.0, 10.0, 7))); sub_registry.register( "particle_sizes", "Distribution of particle data sizes", @@ -51,4 +54,15 @@ impl ConnectionPoolMetrics { particle_queue_size, } } + + pub fn incoming_particle(&self, particle_id: &str, queue_len: i64, particle_len: f64) { + self.particle_queue_size.set(queue_len); + let label = ParticleLabel { + particle_type: ParticleType::from_particle(particle_id), + }; + self.received_particles.get_or_create(&label).inc(); + self.particle_sizes + .get_or_create(&label) + .observe(particle_len); + } } diff --git a/crates/peer-metrics/src/connectivity.rs b/crates/peer-metrics/src/connectivity.rs index a93f929fca..602bd7e6bf 100644 --- a/crates/peer-metrics/src/connectivity.rs +++ b/crates/peer-metrics/src/connectivity.rs @@ -1,3 +1,4 @@ +use crate::{ParticleLabel, ParticleType}; use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; @@ -15,11 +16,12 @@ pub enum Resolution { pub struct ResolutionLabel { action: Resolution, } + #[derive(Clone)] pub struct ConnectivityMetrics { contact_resolve: Family, - pub particle_send_success: Counter, - pub particle_send_failure: Counter, + pub particle_send_success: Family, + pub particle_send_failure: Family, pub bootstrap_disconnected: Counter, pub bootstrap_connected: Counter, } @@ -35,14 +37,14 @@ impl ConnectivityMetrics { contact_resolve.clone(), ); - let particle_send_success = Counter::default(); + let particle_send_success = Family::default(); sub_registry.register( "particle_send_success", "Number of sent particles", particle_send_success.clone(), ); - let particle_send_failure = Counter::default(); + let particle_send_failure = Family::default(); sub_registry.register( "particle_send_failure", "Number of errors on particle sending", @@ -77,4 +79,20 @@ impl ConnectivityMetrics { .get_or_create(&ResolutionLabel { action: resolution }) .inc(); } + + pub fn send_particle_ok(&self, particle: &str) { + self.particle_send_success + .get_or_create(&ParticleLabel { + particle_type: ParticleType::from_particle(particle), + }) + .inc(); + } + + pub fn send_particle_failed(&self, particle: &str) { + self.particle_send_failure + .get_or_create(&ParticleLabel { + particle_type: ParticleType::from_particle(particle), + }) + .inc(); + } } diff --git a/crates/peer-metrics/src/dispatcher.rs b/crates/peer-metrics/src/dispatcher.rs index 6bc0f0ada1..cb7036ac9f 100644 --- a/crates/peer-metrics/src/dispatcher.rs +++ b/crates/peer-metrics/src/dispatcher.rs @@ -1,9 +1,11 @@ +use crate::{ParticleLabel, ParticleType}; use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; use prometheus_client::registry::Registry; #[derive(Clone)] pub struct DispatcherMetrics { - pub expired_particles: Counter, + pub expired_particles: Family, } impl DispatcherMetrics { @@ -23,7 +25,7 @@ impl DispatcherMetrics { // Box::new(parallelism), // ); - let expired_particles = Counter::default(); + let expired_particles = Family::default(); sub_registry.register( "particles_expired", "Number of particles expired by TTL", @@ -32,4 +34,12 @@ impl DispatcherMetrics { DispatcherMetrics { expired_particles } } + + pub fn particle_expired(&self, particle_id: &str) { + self.expired_particles + .get_or_create(&ParticleLabel { + particle_type: ParticleType::from_particle(particle_id), + }) + .inc(); + } } diff --git a/crates/peer-metrics/src/lib.rs b/crates/peer-metrics/src/lib.rs index 307af1d726..5e260e0e80 100644 --- a/crates/peer-metrics/src/lib.rs +++ b/crates/peer-metrics/src/lib.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use prometheus_client::encoding::EncodeMetric; +use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, EncodeMetric}; use prometheus_client::registry::Registry; pub use connection_pool::ConnectionPoolMetrics; @@ -29,6 +29,27 @@ mod vm_pool; // - count 'Error processing inbound ProtocolMessage: unexpected end of file' // - number of scheduled script executions +#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)] +pub enum ParticleType { + Spell, + Common, +} + +impl ParticleType { + fn from_particle(particle_id: &str) -> Self { + if particle_id.starts_with("spell_") { + ParticleType::Spell + } else { + ParticleType::Common + } + } +} + +#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] +pub struct ParticleLabel { + particle_type: ParticleType, +} + /// from 100 microseconds to 120 seconds pub(self) fn execution_time_buckets() -> std::vec::IntoIter { vec![ diff --git a/particle-node/src/connectivity.rs b/particle-node/src/connectivity.rs index d049039f9d..f1879866c0 100644 --- a/particle-node/src/connectivity.rs +++ b/particle-node/src/connectivity.rs @@ -129,11 +129,11 @@ impl Connectivity { let sent = self.connection_pool.send(contact.clone(), particle).await; match &sent { SendStatus::Ok => { - metrics.map(|m| m.particle_send_success.inc()); + metrics.map(|m| m.send_particle_ok(&id)); log::info!("Sent particle {} to {}", id, contact); } err => { - metrics.map(|m| m.particle_send_failure.inc()); + metrics.map(|m| m.send_particle_failed(&id)); log::warn!( "Failed to send particle {} to {}, reason: {:?}", id, diff --git a/particle-node/src/dispatcher.rs b/particle-node/src/dispatcher.rs index 20fa40a022..59f9a8f4b5 100644 --- a/particle-node/src/dispatcher.rs +++ b/particle-node/src/dispatcher.rs @@ -94,7 +94,7 @@ impl Dispatcher { let metrics = metrics.clone(); if particle.is_expired() { - metrics.map(|m| m.expired_particles.inc()); + metrics.map(|m| m.particle_expired(&particle.id)); log::info!("Particle {} expired", particle.id); return async {}.boxed(); } From 7457fd109e46c0375375e53ed246ea5337afea6c Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Tue, 4 Apr 2023 13:59:38 +0200 Subject: [PATCH 2/2] fix lint --- particle-node/src/connectivity.rs | 8 ++++++-- particle-node/src/dispatcher.rs | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/particle-node/src/connectivity.rs b/particle-node/src/connectivity.rs index f1879866c0..f9128adc92 100644 --- a/particle-node/src/connectivity.rs +++ b/particle-node/src/connectivity.rs @@ -129,11 +129,15 @@ impl Connectivity { let sent = self.connection_pool.send(contact.clone(), particle).await; match &sent { SendStatus::Ok => { - metrics.map(|m| m.send_particle_ok(&id)); + if let Some(m) = metrics { + m.send_particle_ok(&id) + } log::info!("Sent particle {} to {}", id, contact); } err => { - metrics.map(|m| m.send_particle_failed(&id)); + if let Some(m) = metrics { + m.send_particle_failed(&id); + } log::warn!( "Failed to send particle {} to {}, reason: {:?}", id, diff --git a/particle-node/src/dispatcher.rs b/particle-node/src/dispatcher.rs index 59f9a8f4b5..e2ef14120c 100644 --- a/particle-node/src/dispatcher.rs +++ b/particle-node/src/dispatcher.rs @@ -94,7 +94,9 @@ impl Dispatcher { let metrics = metrics.clone(); if particle.is_expired() { - metrics.map(|m| m.particle_expired(&particle.id)); + if let Some(m) = metrics { + m.particle_expired(&particle.id); + } log::info!("Particle {} expired", particle.id); return async {}.boxed(); }