diff --git a/Cargo.lock b/Cargo.lock index 5ff78eaf878..e35227d2e5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3258,9 +3258,9 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c" +checksum = "b7f64969ffd5dd8f39bd57a68ac53c163a095ed9d0fb707146da1b27025a3504" dependencies = [ "cfg-if 1.0.0", "fnv", diff --git a/applications/tari_base_node/src/command_handler.rs b/applications/tari_base_node/src/command_handler.rs index 8db816de579..7da309f2525 100644 --- a/applications/tari_base_node/src/command_handler.rs +++ b/applications/tari_base_node/src/command_handler.rs @@ -1199,6 +1199,59 @@ impl CommandHandler { } }); } + + #[cfg(not(feature = "metrics"))] + pub fn get_network_stats(&self) { + println!( + "Metrics are not enabled in this binary. Recompile Tari base node with `--features metrics` to enable \ + them." + ); + } + + #[cfg(feature = "metrics")] + pub fn get_network_stats(&self) { + use tari_metrics::proto::MetricType; + self.executor.spawn(async move { + let metric_families = tari_metrics::get_default_registry().gather(); + let metric_family_iter = metric_families + .into_iter() + .filter(|family| family.get_name().starts_with("tari_comms")); + + // TODO: Make this useful + let mut table = Table::new(); + table.set_titles(vec!["name", "type", "value"]); + for family in metric_family_iter { + let field_type = family.get_field_type(); + let name = family.get_name(); + for metric in family.get_metric() { + let value = match field_type { + MetricType::COUNTER => metric.get_counter().get_value(), + MetricType::GAUGE => metric.get_gauge().get_value(), + MetricType::SUMMARY => { + let summary = metric.get_summary(); + summary.get_sample_sum() / summary.get_sample_count() as f64 + }, + MetricType::UNTYPED => metric.get_untyped().get_value(), + MetricType::HISTOGRAM => { + let histogram = metric.get_histogram(); + histogram.get_sample_sum() / histogram.get_sample_count() as f64 + }, + }; + + let field_type = match field_type { + MetricType::COUNTER => "COUNTER", + MetricType::GAUGE => "GAUGE", + MetricType::SUMMARY => "SUMMARY", + MetricType::UNTYPED => "UNTYPED", + MetricType::HISTOGRAM => "HISTOGRAM", + }; + + table.add_row(row![name, field_type, value]); + } + } + table.print_stdout(); + }); + } } async fn fetch_banned_peers(pm: &PeerManager) -> Result, PeerManagerError> { diff --git a/applications/tari_base_node/src/parser.rs b/applications/tari_base_node/src/parser.rs index 1f08265d5ba..7ef6717dd29 100644 --- a/applications/tari_base_node/src/parser.rs +++ b/applications/tari_base_node/src/parser.rs @@ -84,6 +84,7 @@ pub enum BaseNodeCommand { GetMempoolState, Whoami, GetStateInfo, + GetNetworkStats, Quit, Exit, } @@ -265,6 +266,9 @@ impl Parser { Whoami => { self.command_handler.whoami(); }, + GetNetworkStats => { + self.command_handler.get_network_stats(); + }, Exit | Quit => { println!("Shutting down..."); info!( @@ -412,6 +416,9 @@ impl Parser { address" ); }, + GetNetworkStats => { + println!("Displays network stats"); + }, Exit | Quit => { println!("Exits the base node"); }, diff --git a/comms/src/connection_manager/listener.rs b/comms/src/connection_manager/listener.rs index 21c3771610e..c79e1927954 100644 --- a/comms/src/connection_manager/listener.rs +++ b/comms/src/connection_manager/listener.rs @@ -41,7 +41,6 @@ use crate::{ protocol::ProtocolId, runtime, transports::Transport, - types::CommsPublicKey, utils::multiaddr::multiaddr_to_socketaddr, PeerManager, }; @@ -335,21 +334,22 @@ where } async fn remote_public_key_from_socket(socket: TTransport::Output, noise_config: NoiseConfig) -> String { - let public_key: Option = match time::timeout( + let noise_socket = time::timeout( Duration::from_secs(30), noise_config.upgrade_socket(socket, ConnectionDirection::Inbound), ) - .await - .map_err(|_| ConnectionManagerError::NoiseProtocolTimeout) - { - Ok(Ok(noise_socket)) => { - match noise_socket - .get_remote_public_key() - .ok_or(ConnectionManagerError::InvalidStaticPublicKey) - { - Ok(pk) => Some(pk), - _ => None, + .await; + + let public_key = match noise_socket { + Ok(Ok(mut noise_socket)) => { + let pk = noise_socket.get_remote_public_key(); + if let Err(err) = noise_socket.shutdown().await { + debug!( + target: LOG_TARGET, + "IO error when closing socket after invalid wire format: {}", err + ); } + pk }, _ => None, }; diff --git a/comms/src/connectivity/manager.rs b/comms/src/connectivity/manager.rs index 3a25af36017..d8043050e7f 100644 --- a/comms/src/connectivity/manager.rs +++ b/comms/src/connectivity/manager.rs @@ -510,6 +510,10 @@ impl ConnectivityManagerActor { _ => {}, } }, + #[cfg(feature = "metrics")] + NewInboundSubstream(node_id, protocol, _) => { + super::metrics::substream_request_count(node_id, protocol).inc(); + }, _ => {}, } diff --git a/comms/src/connectivity/metrics.rs b/comms/src/connectivity/metrics.rs index 1c5e528da0e..56bd8f781d1 100644 --- a/comms/src/connectivity/metrics.rs +++ b/comms/src/connectivity/metrics.rs @@ -20,24 +20,35 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::connection_manager::ConnectionDirection; +use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId}; use once_cell::sync::Lazy; use tari_metrics::{IntGauge, IntGaugeVec}; pub fn connections(direction: ConnectionDirection) -> IntGauge { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_gauge_vec("comms::connections", "Number of active connections by direction", &[ "direction", ]) .unwrap() }); - GAUGE.with_label_values(&[direction.as_str()]) + METER.with_label_values(&[direction.as_str()]) +} + +pub fn substream_request_count(peer: &NodeId, protocol: &ProtocolId) -> IntGauge { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_gauge_vec("comms::substream_request_count", "Number of substream requests", &[ + "peer", "protocol", + ]) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn uptime() -> IntGauge { - static GAUGE: Lazy = + static METER: Lazy = Lazy::new(|| tari_metrics::register_int_gauge("comms::uptime", "Comms uptime").unwrap()); - GAUGE.clone() + METER.clone() } diff --git a/comms/src/protocol/rpc/client/metrics.rs b/comms/src/protocol/rpc/client/metrics.rs index 6f3ae5a0acf..ee77a6b91f8 100644 --- a/comms/src/protocol/rpc/client/metrics.rs +++ b/comms/src/protocol/rpc/client/metrics.rs @@ -25,7 +25,7 @@ use once_cell::sync::Lazy; use tari_metrics::{Histogram, HistogramVec, IntGauge, IntGaugeVec}; pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_gauge_vec( "comms::rpc::client::num_sessions", "The number of active clients per node per protocol", @@ -34,11 +34,11 @@ pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn handshake_errors(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_gauge_vec( "comms::rpc::client::handshake_errors", "The number of handshake errors per node per protocol", @@ -47,11 +47,11 @@ pub fn handshake_errors(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn request_response_latency(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::client::request_response_latency", "A histogram of request to first response latency", @@ -60,11 +60,11 @@ pub fn request_response_latency(node_id: &NodeId, protocol: &ProtocolId) -> Hist .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn outbound_request_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::client::outbound_request_bytes", "Avg. request bytes per node per protocol", @@ -73,11 +73,11 @@ pub fn outbound_request_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histog .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn inbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::client::inbound_response_bytes", "Avg. response bytes per peer per protocol", @@ -86,5 +86,5 @@ pub fn inbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histog .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } diff --git a/comms/src/protocol/rpc/server/metrics.rs b/comms/src/protocol/rpc/server/metrics.rs index 79e13d6038a..cb0c1d7d2a9 100644 --- a/comms/src/protocol/rpc/server/metrics.rs +++ b/comms/src/protocol/rpc/server/metrics.rs @@ -25,7 +25,7 @@ use once_cell::sync::Lazy; use tari_metrics::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_gauge_vec( "comms::rpc::server::num_sessions", "The number of active server sessions per node per protocol", @@ -34,11 +34,11 @@ pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_gauge_vec( "comms::rpc::server::handshake_errors", "The number of handshake errors per node per protocol", @@ -47,11 +47,11 @@ pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGa .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_counter_vec( "comms::rpc::server::error_count", "The number of RPC errors per node per protocol", @@ -60,11 +60,11 @@ pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter { .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn inbound_requests_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::server::inbound_request_bytes", "Avg. request bytes per node per protocol", @@ -73,11 +73,11 @@ pub fn inbound_requests_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histog .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } pub fn outbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { - static GAUGE: Lazy = Lazy::new(|| { + static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::server::outbound_response_bytes", "Avg. response bytes per peer per protocol", @@ -86,5 +86,5 @@ pub fn outbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histo .unwrap() }); - GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } diff --git a/infrastructure/metrics/Cargo.toml b/infrastructure/metrics/Cargo.toml index acf393237f6..c4111b1ef63 100644 --- a/infrastructure/metrics/Cargo.toml +++ b/infrastructure/metrics/Cargo.toml @@ -11,4 +11,4 @@ license = "BSD-3-Clause" [dependencies] once_cell = "1.8.0" -prometheus = "0.12.0" +prometheus = "0.13.0" diff --git a/infrastructure/metrics/src/lib.rs b/infrastructure/metrics/src/lib.rs index 1eac5e91426..c9d82abff3e 100644 --- a/infrastructure/metrics/src/lib.rs +++ b/infrastructure/metrics/src/lib.rs @@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock}; pub use prometheus::{ core::Collector, + proto, Counter, CounterVec, Encoder,