From eda8f756a5de72601d6dc2bc1ac0abdae065467c Mon Sep 17 00:00:00 2001 From: zeroqn Date: Thu, 10 Sep 2020 14:04:58 +0800 Subject: [PATCH] feat(metric): add accumulated network message size count (#449) * feat(metric): add network outbound accumulated message size * feat(metric): add network received accumulated message size * fix(network): compilation errors * fix(network): message size should be multipled by target's length * fix: compilation error * fix(metric): NETWORK_MESSAGE_SIZE_COUNT_VEC arguments number --- common/apm/src/metrics/network.rs | 6 +++++ core/network/src/outbound/gossip.rs | 18 ++++++++----- core/network/src/outbound/rpc.rs | 17 +++++++----- .../src/protocols/transmitter/behaviour.rs | 26 ++++++++++++++++--- .../src/protocols/transmitter/message.rs | 3 ++- core/network/src/reactor/router.rs | 5 ++++ core/network/src/traits.rs | 13 ++++++++++ 7 files changed, 69 insertions(+), 19 deletions(-) diff --git a/common/apm/src/metrics/network.rs b/common/apm/src/metrics/network.rs index 40d038632..ad33597d9 100644 --- a/common/apm/src/metrics/network.rs +++ b/common/apm/src/metrics/network.rs @@ -47,6 +47,12 @@ lazy_static! { &["direction", "target", "type", "module", "action"] ) .expect("network message total"); + pub static ref NETWORK_MESSAGE_SIZE_COUNT_VEC: IntCounterVec = register_int_counter_vec!( + "muta_network_message_size", + "Accumulated compressed network message size", + &["direction", "url"] + ) + .expect("network message size"); pub static ref NETWORK_RPC_RESULT_COUNT_VEC: IntCounterVec = register_int_counter_vec!( "muta_network_rpc_result_total", "Total number of network rpc result", diff --git a/core/network/src/outbound/gossip.rs b/core/network/src/outbound/gossip.rs index 7e83b4efe..b20ca6e04 100644 --- a/core/network/src/outbound/gossip.rs +++ b/core/network/src/outbound/gossip.rs @@ -8,7 +8,7 @@ use crate::endpoint::Endpoint; use crate::error::NetworkError; use crate::message::{Headers, NetworkMessage}; use crate::protocols::{Recipient, Transmitter, TransmitterMessage}; -use crate::traits::Compression; +use crate::traits::{Compression, NetworkContext}; use crate::PeerIdExt; #[derive(Clone)] @@ -46,7 +46,7 @@ impl NetworkGossip { async fn send_to_sessions( &self, - _ctx: Context, + ctx: Context, target_session: TargetSession, data: Bytes, priority: Priority, @@ -55,6 +55,7 @@ impl NetworkGossip { recipient: Recipient::Session(target_session), priority, data, + ctx, }; self.transmitter.behaviour.send(msg).await @@ -62,7 +63,7 @@ impl NetworkGossip { async fn send_to_peers<'a, P: AsRef<[Bytes]> + 'a>( &self, - _ctx: Context, + ctx: Context, peer_ids: P, data: Bytes, priority: Priority, @@ -78,6 +79,7 @@ impl NetworkGossip { recipient: Recipient::PeerId(peer_ids), priority, data, + ctx, }; self.transmitter.behaviour.send(msg).await @@ -88,7 +90,7 @@ impl NetworkGossip { impl Gossip for NetworkGossip { async fn broadcast( &self, - cx: Context, + mut cx: Context, endpoint: &str, msg: M, priority: Priority, @@ -97,7 +99,8 @@ impl Gossip for NetworkGossip { M: MessageCodec, { let msg = self.package_message(cx.clone(), endpoint, msg).await?; - self.send_to_sessions(cx, TargetSession::All, msg, priority) + let ctx = cx.set_url(endpoint.to_owned()); + self.send_to_sessions(ctx, TargetSession::All, msg, priority) .await?; common_apm::metrics::network::on_network_message_sent_all_target(endpoint); Ok(()) @@ -105,7 +108,7 @@ impl Gossip for NetworkGossip { async fn multicast<'a, M, P>( &self, - cx: Context, + mut cx: Context, endpoint: &str, peer_ids: P, msg: M, @@ -118,7 +121,8 @@ impl Gossip for NetworkGossip { let msg = self.package_message(cx.clone(), endpoint, msg).await?; let multicast_count = peer_ids.as_ref().len(); - self.send_to_peers(cx, peer_ids, msg, priority).await?; + let ctx = cx.set_url(endpoint.to_owned()); + self.send_to_peers(ctx, peer_ids, msg, priority).await?; common_apm::metrics::network::on_network_message_sent_multi_target( endpoint, multicast_count as i64, diff --git a/core/network/src/outbound/rpc.rs b/core/network/src/outbound/rpc.rs index 17099cb8a..4befc8d44 100644 --- a/core/network/src/outbound/rpc.rs +++ b/core/network/src/outbound/rpc.rs @@ -32,7 +32,7 @@ impl NetworkRpc { async fn send( &self, - _: Context, + ctx: Context, session_id: SessionId, data: Bytes, priority: Priority, @@ -43,6 +43,7 @@ impl NetworkRpc { recipient: Recipient::Session(TargetSession::Single(session_id)), priority, data: compressed_data, + ctx, }; self.transmitter.behaviour.send(msg).await @@ -53,7 +54,7 @@ impl NetworkRpc { impl Rpc for NetworkRpc { async fn call( &self, - cx: Context, + mut cx: Context, endpoint: &str, mut msg: M, priority: Priority, @@ -99,9 +100,10 @@ impl Rpc for NetworkRpc { log::info!("no trace id found for rpc {}", endpoint.full_url()); } common_apm::metrics::network::on_network_message_sent(endpoint.full_url()); - let net_msg = NetworkMessage::new(endpoint, data, headers).encode()?; - self.send(cx, sid, net_msg, priority).await?; + let ctx = cx.set_url(endpoint.full_url().to_owned()); + let net_msg = NetworkMessage::new(endpoint, data, headers).encode()?; + self.send(ctx, sid, net_msg, priority).await?; let timeout = Delay::new(self.timeout.rpc); let ret = match future::select(done_rx, timeout).await { @@ -134,7 +136,7 @@ impl Rpc for NetworkRpc { async fn response( &self, - cx: Context, + mut cx: Context, endpoint: &str, ret: ProtocolResult, priority: Priority, @@ -163,9 +165,10 @@ impl Rpc for NetworkRpc { log::info!("no trace id found for rpc {}", endpoint.full_url()); } common_apm::metrics::network::on_network_message_sent(endpoint.full_url()); - let net_msg = NetworkMessage::new(endpoint, encoded_resp, headers).encode()?; - self.send(cx, sid, net_msg, priority).await?; + let ctx = cx.set_url(endpoint.full_url().to_owned()); + let net_msg = NetworkMessage::new(endpoint, encoded_resp, headers).encode()?; + self.send(ctx, sid, net_msg, priority).await?; Ok(()) } diff --git a/core/network/src/protocols/transmitter/behaviour.rs b/core/network/src/protocols/transmitter/behaviour.rs index 0d3a2d7e7..a1500e218 100644 --- a/core/network/src/protocols/transmitter/behaviour.rs +++ b/core/network/src/protocols/transmitter/behaviour.rs @@ -23,7 +23,7 @@ use crate::error::{ErrorKind, NetworkError}; use crate::event::PeerManagerEvent; use crate::peer_manager::SharedSessions; use crate::protocols::core::TRANSMITTER_PROTOCOL_ID; -use crate::traits::SharedSessionBook; +use crate::traits::{NetworkContext, SharedSessionBook}; // TODO: Refactor connection service, decouple protocol and service // initialization. @@ -150,6 +150,8 @@ impl Future for BackgroundSending { } } +type MessageContext = protocol::traits::Context; + struct SendingContext<'a> { conn_ctrl: &'a ConnectionServiceControl, peers_serv: &'a UnboundedSender, @@ -162,8 +164,8 @@ impl<'a> SendingContext<'a> { let TransmitterMessage { priority, data, .. } = msg; match msg.recipient { - Recipient::Session(target) => self.send_to_sessions(target, data, priority), - Recipient::PeerId(peer_ids) => self.send_to_peers(peer_ids, data, priority), + Recipient::Session(target) => self.send_to_sessions(target, data, priority, msg.ctx), + Recipient::PeerId(peer_ids) => self.send_to_peers(peer_ids, data, priority, msg.ctx), } } @@ -172,6 +174,7 @@ impl<'a> SendingContext<'a> { target: TargetSession, mut data: Bytes, priority: Priority, + msg_ctx: MessageContext, ) -> Result<(), NetworkError> { let (target, opt_blocked) = match self.filter_blocked(target) { (None, None) => unreachable!(), @@ -184,6 +187,19 @@ impl<'a> SendingContext<'a> { (Some(tar), opt_blocked) => (tar, opt_blocked), }; + let url = msg_ctx.url().unwrap_or_else(|_| ""); + let data_size = match &target { + TargetSession::Single(_) => data.len(), + TargetSession::Multi(sessions) => data.len().saturating_mul(sessions.len()), + _ => { + log::warn!("filter blocked return target other than single and multi"); + data.len() + } + }; + common_apm::metrics::network::NETWORK_MESSAGE_SIZE_COUNT_VEC + .with_label_values(&["send", url]) + .inc_by(data_size as i64); + let seq = self.data_seq.fetch_add(1, Ordering::SeqCst); log::debug!("seq {} data size {}", seq, data.len()); @@ -285,9 +301,11 @@ impl<'a> SendingContext<'a> { peer_ids: Vec, data: Bytes, priority: Priority, + msg_ctx: MessageContext, ) -> Result<(), NetworkError> { let (connected, unconnected) = self.sessions.peers(peer_ids); - let send_ret = self.send_to_sessions(TargetSession::Multi(connected), data, priority); + let send_ret = + self.send_to_sessions(TargetSession::Multi(connected), data, priority, msg_ctx); if unconnected.is_empty() { return send_ret; } diff --git a/core/network/src/protocols/transmitter/message.rs b/core/network/src/protocols/transmitter/message.rs index 6896f4587..17666fec8 100644 --- a/core/network/src/protocols/transmitter/message.rs +++ b/core/network/src/protocols/transmitter/message.rs @@ -1,5 +1,5 @@ use bytes::{Buf, BufMut}; -use protocol::traits::Priority; +use protocol::traits::{Context, Priority}; use protocol::{Bytes, BytesMut}; use tentacle::secio::PeerId; use tentacle::service::TargetSession; @@ -14,6 +14,7 @@ pub struct TransmitterMessage { pub recipient: Recipient, pub priority: Priority, pub data: Bytes, + pub ctx: Context, // For metric } pub struct ReceivedMessage { diff --git a/core/network/src/reactor/router.rs b/core/network/src/reactor/router.rs index 02d81daef..b132f3abd 100644 --- a/core/network/src/reactor/router.rs +++ b/core/network/src/reactor/router.rs @@ -142,6 +142,7 @@ where Arc::clone(&self.rpc_map), self.trust_tx.clone(), ); + let raw_data_size = recv_msg.data.len(); async move { let network_message = { @@ -151,6 +152,10 @@ where common_apm::metrics::network::on_network_message_received(&network_message.url); let endpoint = network_message.url.parse::()?; + common_apm::metrics::network::NETWORK_MESSAGE_SIZE_COUNT_VEC + .with_label_values(&["received", endpoint.full_url()]) + .inc_by(raw_data_size as i64); + let reactor = { let opt_reactor = reactor_map.read().get(&endpoint).cloned(); opt_reactor diff --git a/core/network/src/traits.rs b/core/network/src/traits.rs index b69560885..ef47f9af3 100644 --- a/core/network/src/traits.rs +++ b/core/network/src/traits.rs @@ -31,6 +31,8 @@ pub trait NetworkContext: Sized { fn set_remote_connected_addr(&mut self, addr: ConnectedAddr) -> Self; fn rpc_id(&self) -> Result; fn set_rpc_id(&mut self, rid: u64) -> Self; + fn url(&self) -> Result<&str, ()>; + fn set_url(&mut self, url: String) -> Self; } pub trait ListenExchangeManager { @@ -102,4 +104,15 @@ impl NetworkContext for Context { fn set_rpc_id(&mut self, rid: u64) -> Self { self.with_value::("rpc_id", CtxRpcId(rid)) } + + fn url(&self) -> Result<&str, ()> { + self.get::("url") + .map(String::as_str) + .ok_or_else(|| ()) + } + + #[must_use] + fn set_url(&mut self, url: String) -> Self { + self.with_value::("url", url) + } }