Skip to content

Commit

Permalink
Add get-network-stats command
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 14, 2021
1 parent 9deaedd commit 20f4c11
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 40 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,59 @@ impl CommandHandler {
}
});
}

#[cfg(not(feature = "metrics"))]
pub fn get_network_stats(&self) {
println!(
"Metrics are not enabled in this binary. Recompile Tari base node with `--features metrics` to enable \
them."
);
}

#[cfg(feature = "metrics")]
pub fn get_network_stats(&self) {
use tari_metrics::proto::MetricType;
self.executor.spawn(async move {
let metric_families = tari_metrics::get_default_registry().gather();
let metric_family_iter = metric_families
.into_iter()
.filter(|family| family.get_name().starts_with("tari_comms"));

// TODO: Make this useful
let mut table = Table::new();
table.set_titles(vec!["name", "type", "value"]);
for family in metric_family_iter {
let field_type = family.get_field_type();
let name = family.get_name();
for metric in family.get_metric() {
let value = match field_type {
MetricType::COUNTER => metric.get_counter().get_value(),
MetricType::GAUGE => metric.get_gauge().get_value(),
MetricType::SUMMARY => {
let summary = metric.get_summary();
summary.get_sample_sum() / summary.get_sample_count() as f64
},
MetricType::UNTYPED => metric.get_untyped().get_value(),
MetricType::HISTOGRAM => {
let histogram = metric.get_histogram();
histogram.get_sample_sum() / histogram.get_sample_count() as f64
},
};

let field_type = match field_type {
MetricType::COUNTER => "COUNTER",
MetricType::GAUGE => "GAUGE",
MetricType::SUMMARY => "SUMMARY",
MetricType::UNTYPED => "UNTYPED",
MetricType::HISTOGRAM => "HISTOGRAM",
};

table.add_row(row![name, field_type, value]);
}
}
table.print_stdout();
});
}
}

async fn fetch_banned_peers(pm: &PeerManager) -> Result<Vec<Peer>, PeerManagerError> {
Expand Down
7 changes: 7 additions & 0 deletions applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub enum BaseNodeCommand {
GetMempoolState,
Whoami,
GetStateInfo,
GetNetworkStats,
Quit,
Exit,
}
Expand Down Expand Up @@ -265,6 +266,9 @@ impl Parser {
Whoami => {
self.command_handler.whoami();
},
GetNetworkStats => {
self.command_handler.get_network_stats();
},
Exit | Quit => {
println!("Shutting down...");
info!(
Expand Down Expand Up @@ -412,6 +416,9 @@ impl Parser {
address"
);
},
GetNetworkStats => {
println!("Displays network stats");
},
Exit | Quit => {
println!("Exits the base node");
},
Expand Down
24 changes: 12 additions & 12 deletions comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::{
protocol::ProtocolId,
runtime,
transports::Transport,
types::CommsPublicKey,
utils::multiaddr::multiaddr_to_socketaddr,
PeerManager,
};
Expand Down Expand Up @@ -335,21 +334,22 @@ where
}

async fn remote_public_key_from_socket(socket: TTransport::Output, noise_config: NoiseConfig) -> String {
let public_key: Option<CommsPublicKey> = match time::timeout(
let noise_socket = time::timeout(
Duration::from_secs(30),
noise_config.upgrade_socket(socket, ConnectionDirection::Inbound),
)
.await
.map_err(|_| ConnectionManagerError::NoiseProtocolTimeout)
{
Ok(Ok(noise_socket)) => {
match noise_socket
.get_remote_public_key()
.ok_or(ConnectionManagerError::InvalidStaticPublicKey)
{
Ok(pk) => Some(pk),
_ => None,
.await;

let public_key = match noise_socket {
Ok(Ok(mut noise_socket)) => {
let pk = noise_socket.get_remote_public_key();
if let Err(err) = noise_socket.shutdown().await {
debug!(
target: LOG_TARGET,
"IO error when closing socket after invalid wire format: {}", err
);
}
pk
},
_ => None,
};
Expand Down
4 changes: 4 additions & 0 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ impl ConnectivityManagerActor {
_ => {},
}
},
#[cfg(feature = "metrics")]
NewInboundSubstream(node_id, protocol, _) => {
super::metrics::substream_request_count(node_id, protocol).inc();
},
_ => {},
}

Expand Down
21 changes: 16 additions & 5 deletions comms/src/connectivity/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,35 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::connection_manager::ConnectionDirection;
use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId};
use once_cell::sync::Lazy;
use tari_metrics::{IntGauge, IntGaugeVec};

pub fn connections(direction: ConnectionDirection) -> IntGauge {
static GAUGE: Lazy<IntGaugeVec> = Lazy::new(|| {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec("comms::connections", "Number of active connections by direction", &[
"direction",
])
.unwrap()
});

GAUGE.with_label_values(&[direction.as_str()])
METER.with_label_values(&[direction.as_str()])
}

pub fn substream_request_count(peer: &NodeId, protocol: &ProtocolId) -> IntGauge {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec("comms::substream_request_count", "Number of substream requests", &[
"peer", "protocol",
])
.unwrap()
});

METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn uptime() -> IntGauge {
static GAUGE: Lazy<IntGauge> =
static METER: Lazy<IntGauge> =
Lazy::new(|| tari_metrics::register_int_gauge("comms::uptime", "Comms uptime").unwrap());

GAUGE.clone()
METER.clone()
}
20 changes: 10 additions & 10 deletions comms/src/protocol/rpc/client/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use once_cell::sync::Lazy;
use tari_metrics::{Histogram, HistogramVec, IntGauge, IntGaugeVec};

pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
static GAUGE: Lazy<IntGaugeVec> = Lazy::new(|| {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec(
"comms::rpc::client::num_sessions",
"The number of active clients per node per protocol",
Expand All @@ -34,11 +34,11 @@ pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn handshake_errors(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
static GAUGE: Lazy<IntGaugeVec> = Lazy::new(|| {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec(
"comms::rpc::client::handshake_errors",
"The number of handshake errors per node per protocol",
Expand All @@ -47,11 +47,11 @@ pub fn handshake_errors(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn request_response_latency(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
static GAUGE: Lazy<HistogramVec> = Lazy::new(|| {
static METER: Lazy<HistogramVec> = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::client::request_response_latency",
"A histogram of request to first response latency",
Expand All @@ -60,11 +60,11 @@ pub fn request_response_latency(node_id: &NodeId, protocol: &ProtocolId) -> Hist
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn outbound_request_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
static GAUGE: Lazy<HistogramVec> = Lazy::new(|| {
static METER: Lazy<HistogramVec> = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::client::outbound_request_bytes",
"Avg. request bytes per node per protocol",
Expand All @@ -73,11 +73,11 @@ pub fn outbound_request_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histog
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn inbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
static GAUGE: Lazy<HistogramVec> = Lazy::new(|| {
static METER: Lazy<HistogramVec> = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::client::inbound_response_bytes",
"Avg. response bytes per peer per protocol",
Expand All @@ -86,5 +86,5 @@ pub fn inbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histog
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
20 changes: 10 additions & 10 deletions comms/src/protocol/rpc/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use once_cell::sync::Lazy;
use tari_metrics::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec};

pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
static GAUGE: Lazy<IntGaugeVec> = Lazy::new(|| {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec(
"comms::rpc::server::num_sessions",
"The number of active server sessions per node per protocol",
Expand All @@ -34,11 +34,11 @@ pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
static GAUGE: Lazy<IntGaugeVec> = Lazy::new(|| {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec(
"comms::rpc::server::handshake_errors",
"The number of handshake errors per node per protocol",
Expand All @@ -47,11 +47,11 @@ pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGa
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter {
static GAUGE: Lazy<IntCounterVec> = Lazy::new(|| {
static METER: Lazy<IntCounterVec> = Lazy::new(|| {
tari_metrics::register_int_counter_vec(
"comms::rpc::server::error_count",
"The number of RPC errors per node per protocol",
Expand All @@ -60,11 +60,11 @@ pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter {
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn inbound_requests_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
static GAUGE: Lazy<HistogramVec> = Lazy::new(|| {
static METER: Lazy<HistogramVec> = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::server::inbound_request_bytes",
"Avg. request bytes per node per protocol",
Expand All @@ -73,11 +73,11 @@ pub fn inbound_requests_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histog
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn outbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
static GAUGE: Lazy<HistogramVec> = Lazy::new(|| {
static METER: Lazy<HistogramVec> = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::server::outbound_response_bytes",
"Avg. response bytes per peer per protocol",
Expand All @@ -86,5 +86,5 @@ pub fn outbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histo
.unwrap()
});

GAUGE.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
2 changes: 1 addition & 1 deletion infrastructure/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ license = "BSD-3-Clause"

[dependencies]
once_cell = "1.8.0"
prometheus = "0.12.0"
prometheus = "0.13.0"
1 change: 1 addition & 0 deletions infrastructure/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock};

pub use prometheus::{
core::Collector,
proto,
Counter,
CounterVec,
Encoder,
Expand Down

0 comments on commit 20f4c11

Please sign in to comment.