From 29f1c3cde109feedf9c99c11c284f9336a2a6ee7 Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Fri, 31 Mar 2023 12:29:50 +0200 Subject: [PATCH 1/3] remove doubling libp2p metrics gathering --- particle-node/src/node.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/particle-node/src/node.rs b/particle-node/src/node.rs index f23d3e64f6..76ed55a73c 100644 --- a/particle-node/src/node.rs +++ b/particle-node/src/node.rs @@ -34,7 +34,7 @@ use libp2p::{ swarm::AddressScore, PeerId, Swarm, TransportError, }; -use libp2p_metrics::{Metrics, Recorder}; +use libp2p_metrics::Metrics; use libp2p_swarm::{ConnectionLimits, SwarmBuilder}; use particle_builtins::{Builtins, CustomService, NodeInfo}; use particle_execution::ParticleFunctionStatic; @@ -413,13 +413,11 @@ impl Node { .unwrap_or("node".to_owned()); task::Builder::new().name(&task_name.clone()).spawn(async move { - let (mut metrics_fut, libp2p_metrics) = if let Some(mut registry) = registry { - let libp2p_metrics = Metrics::new(&mut registry); + let mut metrics_fut= if let Some(registry) = registry { log::info!("metrics_listen_addr {}", metrics_listen_addr); - let fut = start_metrics_endpoint(registry, metrics_listen_addr).boxed(); - (fut, Some(libp2p_metrics)) + start_metrics_endpoint(registry, metrics_listen_addr).boxed() } else { - (futures::future::pending().boxed(), None) + futures::future::pending().boxed() }; let services_metrics_backend = services_metrics_backend.start(); @@ -435,7 +433,6 @@ impl Node { let exit_inlet = exit_inlet.as_mut().expect("Could not get exit inlet"); tokio::select! { Some(e) = swarm.next() => { - if let Some(m) = libp2p_metrics.as_ref() { m.record(&e) } if let SwarmEvent::Behaviour(FluenceNetworkBehaviourEvent::Identify(i)) = e { swarm.behaviour_mut().inject_identify_event(i, true); } From f9493a531656f3469f49fb644c07148d468fad84 Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Fri, 31 Mar 2023 12:56:18 +0200 Subject: [PATCH 2/3] make libp2p metrics shared --- crates/kademlia/src/behaviour.rs | 7 ++++--- crates/server-config/src/network_config.rs | 5 +++-- particle-node/src/node.rs | 12 +++++++++--- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/kademlia/src/behaviour.rs b/crates/kademlia/src/behaviour.rs index 492e8242e5..f0f2329a38 100644 --- a/crates/kademlia/src/behaviour.rs +++ b/crates/kademlia/src/behaviour.rs @@ -16,6 +16,7 @@ use std::error::Error; use std::ops::Mul; +use std::sync::Arc; use std::task::{Context, Poll}; use std::{ cmp::min, @@ -126,11 +127,11 @@ pub struct Kademlia { waker: Option, // Timer to track timed out requests, and return errors ASAP timer: Delay, - metrics: Option, + metrics: Arc>, } impl Kademlia { - pub fn new(config: KademliaConfig, metrics: Option) -> (Self, KademliaApi) { + pub fn new(config: KademliaConfig, metrics: Arc>) -> (Self, KademliaApi) { let timer = Delay::new(config.query_timeout); let store = MemoryStore::new(config.peer_id); @@ -550,7 +551,7 @@ impl Kademlia { } fn inject_kad_event(&mut self, event: KademliaEvent) { - if let Some(metrics) = &self.metrics { + if let Some(metrics) = self.metrics.as_ref() { metrics.record(&event); } diff --git a/crates/server-config/src/network_config.rs b/crates/server-config/src/network_config.rs index 9066faee8e..9e60415838 100644 --- a/crates/server-config/src/network_config.rs +++ b/crates/server-config/src/network_config.rs @@ -17,6 +17,7 @@ use libp2p::swarm::ConnectionLimits; use libp2p::{core::Multiaddr, identity::Keypair, PeerId}; use libp2p_metrics::Metrics; +use std::sync::Arc; use config_utils::to_peer_id; use particle_protocol::ProtocolConfig; @@ -30,7 +31,7 @@ pub struct NetworkConfig { pub node_version: &'static str, pub bootstrap_nodes: Vec, pub bootstrap: BootstrapConfig, - pub libp2p_metrics: Option, + pub libp2p_metrics: Arc>, pub protocol_config: ProtocolConfig, pub kademlia_config: KademliaConfig, pub particle_queue_buffer: usize, @@ -43,7 +44,7 @@ pub struct NetworkConfig { impl NetworkConfig { pub fn new( - libp2p_metrics: Option, + libp2p_metrics: Arc>, connectivity_metrics: Option, connection_pool_metrics: Option, key_pair: Keypair, diff --git a/particle-node/src/node.rs b/particle-node/src/node.rs index 76ed55a73c..8f25b6867a 100644 --- a/particle-node/src/node.rs +++ b/particle-node/src/node.rs @@ -34,7 +34,7 @@ use libp2p::{ swarm::AddressScore, PeerId, Swarm, TransportError, }; -use libp2p_metrics::Metrics; +use libp2p_metrics::{Metrics, Recorder}; use libp2p_swarm::{ConnectionLimits, SwarmBuilder}; use particle_builtins::{Builtins, CustomService, NodeInfo}; use particle_execution::ParticleFunctionStatic; @@ -77,6 +77,7 @@ pub struct Node { sorcerer: Sorcerer, registry: Option, + libp2p_metrics: Arc>, services_metrics_backend: ServicesMetricsBackend, metrics_listen_addr: SocketAddr, @@ -127,7 +128,7 @@ impl Node { } else { None }; - let libp2p_metrics = metrics_registry.as_mut().map(Metrics::new); + let libp2p_metrics = Arc::new(metrics_registry.as_mut().map(Metrics::new)); let connectivity_metrics = metrics_registry.as_mut().map(ConnectivityMetrics::new); let connection_pool_metrics = metrics_registry.as_mut().map(ConnectionPoolMetrics::new); let plumber_metrics = metrics_registry.as_mut().map(ParticleExecutorMetrics::new); @@ -148,7 +149,7 @@ impl Node { .with_max_established(config.node_config.transport_config.max_established); let network_config = NetworkConfig::new( - libp2p_metrics, + libp2p_metrics.clone(), connectivity_metrics, connection_pool_metrics, key_pair, @@ -283,6 +284,7 @@ impl Node { spell_events_receiver, sorcerer, metrics_registry, + libp2p_metrics, services_metrics_backend, config.metrics_listen_addr(), builtins_peer_id, @@ -356,6 +358,7 @@ impl Node { spell_events_receiver: mpsc::UnboundedReceiver, sorcerer: Sorcerer, registry: Option, + libp2p_metrics: Arc>, services_metrics_backend: ServicesMetricsBackend, metrics_listen_addr: SocketAddr, builtins_management_peer_id: PeerId, @@ -377,6 +380,7 @@ impl Node { sorcerer, registry, + libp2p_metrics, services_metrics_backend, metrics_listen_addr, @@ -411,6 +415,7 @@ impl Node { let task_name = peer_id .map(|x| format!("node-{x}")) .unwrap_or("node".to_owned()); + let libp2p_metrics = self.libp2p_metrics; task::Builder::new().name(&task_name.clone()).spawn(async move { let mut metrics_fut= if let Some(registry) = registry { @@ -433,6 +438,7 @@ impl Node { let exit_inlet = exit_inlet.as_mut().expect("Could not get exit inlet"); tokio::select! { Some(e) = swarm.next() => { + if let Some(m) = libp2p_metrics.as_ref() { m.record(&e) } if let SwarmEvent::Behaviour(FluenceNetworkBehaviourEvent::Identify(i)) = e { swarm.behaviour_mut().inject_identify_event(i, true); } From 32a55a2fd9f91ea9ef093a0953697e656a1f9ad4 Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Fri, 31 Mar 2023 14:17:58 +0200 Subject: [PATCH 3/3] refactor type --- crates/kademlia/src/behaviour.rs | 4 ++-- crates/server-config/src/network_config.rs | 4 ++-- particle-node/src/node.rs | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/kademlia/src/behaviour.rs b/crates/kademlia/src/behaviour.rs index f0f2329a38..c1840f2e73 100644 --- a/crates/kademlia/src/behaviour.rs +++ b/crates/kademlia/src/behaviour.rs @@ -127,11 +127,11 @@ pub struct Kademlia { waker: Option, // Timer to track timed out requests, and return errors ASAP timer: Delay, - metrics: Arc>, + metrics: Option>, } impl Kademlia { - pub fn new(config: KademliaConfig, metrics: Arc>) -> (Self, KademliaApi) { + pub fn new(config: KademliaConfig, metrics: Option>) -> (Self, KademliaApi) { let timer = Delay::new(config.query_timeout); let store = MemoryStore::new(config.peer_id); diff --git a/crates/server-config/src/network_config.rs b/crates/server-config/src/network_config.rs index 9e60415838..9cf1c6d10d 100644 --- a/crates/server-config/src/network_config.rs +++ b/crates/server-config/src/network_config.rs @@ -31,7 +31,7 @@ pub struct NetworkConfig { pub node_version: &'static str, pub bootstrap_nodes: Vec, pub bootstrap: BootstrapConfig, - pub libp2p_metrics: Arc>, + pub libp2p_metrics: Option>, pub protocol_config: ProtocolConfig, pub kademlia_config: KademliaConfig, pub particle_queue_buffer: usize, @@ -44,7 +44,7 @@ pub struct NetworkConfig { impl NetworkConfig { pub fn new( - libp2p_metrics: Arc>, + libp2p_metrics: Option>, connectivity_metrics: Option, connection_pool_metrics: Option, key_pair: Keypair, diff --git a/particle-node/src/node.rs b/particle-node/src/node.rs index 8f25b6867a..a6d803c441 100644 --- a/particle-node/src/node.rs +++ b/particle-node/src/node.rs @@ -77,7 +77,7 @@ pub struct Node { sorcerer: Sorcerer, registry: Option, - libp2p_metrics: Arc>, + libp2p_metrics: Option>, services_metrics_backend: ServicesMetricsBackend, metrics_listen_addr: SocketAddr, @@ -128,7 +128,7 @@ impl Node { } else { None }; - let libp2p_metrics = Arc::new(metrics_registry.as_mut().map(Metrics::new)); + let libp2p_metrics = metrics_registry.as_mut().map(|r| Arc::new(Metrics::new(r))); let connectivity_metrics = metrics_registry.as_mut().map(ConnectivityMetrics::new); let connection_pool_metrics = metrics_registry.as_mut().map(ConnectionPoolMetrics::new); let plumber_metrics = metrics_registry.as_mut().map(ParticleExecutorMetrics::new); @@ -358,7 +358,7 @@ impl Node { spell_events_receiver: mpsc::UnboundedReceiver, sorcerer: Sorcerer, registry: Option, - libp2p_metrics: Arc>, + libp2p_metrics: Option>, services_metrics_backend: ServicesMetricsBackend, metrics_listen_addr: SocketAddr, builtins_management_peer_id: PeerId,