Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

feat(metric): add accumulated network message size count #449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/apm/src/metrics/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ lazy_static! {
&["direction", "target", "type", "module", "action"]
)
.expect("network message total");
pub static ref NETWORK_MESSAGE_SIZE_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"muta_network_message_size",
"Accumulated compressed network message size",
&["direction", "url"]
)
.expect("network message size");
pub static ref NETWORK_RPC_RESULT_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"muta_network_rpc_result_total",
"Total number of network rpc result",
Expand Down
18 changes: 11 additions & 7 deletions core/network/src/outbound/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::endpoint::Endpoint;
use crate::error::NetworkError;
use crate::message::{Headers, NetworkMessage};
use crate::protocols::{Recipient, Transmitter, TransmitterMessage};
use crate::traits::Compression;
use crate::traits::{Compression, NetworkContext};
use crate::PeerIdExt;

#[derive(Clone)]
Expand Down Expand Up @@ -46,7 +46,7 @@ impl NetworkGossip {

async fn send_to_sessions(
&self,
_ctx: Context,
ctx: Context,
target_session: TargetSession,
data: Bytes,
priority: Priority,
Expand All @@ -55,14 +55,15 @@ impl NetworkGossip {
recipient: Recipient::Session(target_session),
priority,
data,
ctx,
};

self.transmitter.behaviour.send(msg).await
}

async fn send_to_peers<'a, P: AsRef<[Bytes]> + 'a>(
&self,
_ctx: Context,
ctx: Context,
peer_ids: P,
data: Bytes,
priority: Priority,
Expand All @@ -78,6 +79,7 @@ impl NetworkGossip {
recipient: Recipient::PeerId(peer_ids),
priority,
data,
ctx,
};

self.transmitter.behaviour.send(msg).await
Expand All @@ -88,7 +90,7 @@ impl NetworkGossip {
impl Gossip for NetworkGossip {
async fn broadcast<M>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
msg: M,
priority: Priority,
Expand All @@ -97,15 +99,16 @@ impl Gossip for NetworkGossip {
M: MessageCodec,
{
let msg = self.package_message(cx.clone(), endpoint, msg).await?;
self.send_to_sessions(cx, TargetSession::All, msg, priority)
let ctx = cx.set_url(endpoint.to_owned());
self.send_to_sessions(ctx, TargetSession::All, msg, priority)
.await?;
common_apm::metrics::network::on_network_message_sent_all_target(endpoint);
Ok(())
}

async fn multicast<'a, M, P>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
peer_ids: P,
msg: M,
Expand All @@ -118,7 +121,8 @@ impl Gossip for NetworkGossip {
let msg = self.package_message(cx.clone(), endpoint, msg).await?;
let multicast_count = peer_ids.as_ref().len();

self.send_to_peers(cx, peer_ids, msg, priority).await?;
let ctx = cx.set_url(endpoint.to_owned());
self.send_to_peers(ctx, peer_ids, msg, priority).await?;
common_apm::metrics::network::on_network_message_sent_multi_target(
endpoint,
multicast_count as i64,
Expand Down
17 changes: 10 additions & 7 deletions core/network/src/outbound/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl NetworkRpc {

async fn send(
&self,
_: Context,
ctx: Context,
session_id: SessionId,
data: Bytes,
priority: Priority,
Expand All @@ -43,6 +43,7 @@ impl NetworkRpc {
recipient: Recipient::Session(TargetSession::Single(session_id)),
priority,
data: compressed_data,
ctx,
};

self.transmitter.behaviour.send(msg).await
Expand All @@ -53,7 +54,7 @@ impl NetworkRpc {
impl Rpc for NetworkRpc {
async fn call<M, R>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
mut msg: M,
priority: Priority,
Expand Down Expand Up @@ -99,9 +100,10 @@ impl Rpc for NetworkRpc {
log::info!("no trace id found for rpc {}", endpoint.full_url());
}
common_apm::metrics::network::on_network_message_sent(endpoint.full_url());
let net_msg = NetworkMessage::new(endpoint, data, headers).encode()?;

self.send(cx, sid, net_msg, priority).await?;
let ctx = cx.set_url(endpoint.full_url().to_owned());
let net_msg = NetworkMessage::new(endpoint, data, headers).encode()?;
self.send(ctx, sid, net_msg, priority).await?;

let timeout = Delay::new(self.timeout.rpc);
let ret = match future::select(done_rx, timeout).await {
Expand Down Expand Up @@ -134,7 +136,7 @@ impl Rpc for NetworkRpc {

async fn response<M>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
ret: ProtocolResult<M>,
priority: Priority,
Expand Down Expand Up @@ -163,9 +165,10 @@ impl Rpc for NetworkRpc {
log::info!("no trace id found for rpc {}", endpoint.full_url());
}
common_apm::metrics::network::on_network_message_sent(endpoint.full_url());
let net_msg = NetworkMessage::new(endpoint, encoded_resp, headers).encode()?;

self.send(cx, sid, net_msg, priority).await?;
let ctx = cx.set_url(endpoint.full_url().to_owned());
let net_msg = NetworkMessage::new(endpoint, encoded_resp, headers).encode()?;
self.send(ctx, sid, net_msg, priority).await?;

Ok(())
}
Expand Down
26 changes: 22 additions & 4 deletions core/network/src/protocols/transmitter/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::error::{ErrorKind, NetworkError};
use crate::event::PeerManagerEvent;
use crate::peer_manager::SharedSessions;
use crate::protocols::core::TRANSMITTER_PROTOCOL_ID;
use crate::traits::SharedSessionBook;
use crate::traits::{NetworkContext, SharedSessionBook};

// TODO: Refactor connection service, decouple protocol and service
// initialization.
Expand Down Expand Up @@ -150,6 +150,8 @@ impl Future for BackgroundSending {
}
}

type MessageContext = protocol::traits::Context;

struct SendingContext<'a> {
conn_ctrl: &'a ConnectionServiceControl,
peers_serv: &'a UnboundedSender<PeerManagerEvent>,
Expand All @@ -162,8 +164,8 @@ impl<'a> SendingContext<'a> {
let TransmitterMessage { priority, data, .. } = msg;

match msg.recipient {
Recipient::Session(target) => self.send_to_sessions(target, data, priority),
Recipient::PeerId(peer_ids) => self.send_to_peers(peer_ids, data, priority),
Recipient::Session(target) => self.send_to_sessions(target, data, priority, msg.ctx),
Recipient::PeerId(peer_ids) => self.send_to_peers(peer_ids, data, priority, msg.ctx),
}
}

Expand All @@ -172,6 +174,7 @@ impl<'a> SendingContext<'a> {
target: TargetSession,
mut data: Bytes,
priority: Priority,
msg_ctx: MessageContext,
) -> Result<(), NetworkError> {
let (target, opt_blocked) = match self.filter_blocked(target) {
(None, None) => unreachable!(),
Expand All @@ -184,6 +187,19 @@ impl<'a> SendingContext<'a> {
(Some(tar), opt_blocked) => (tar, opt_blocked),
};

let url = msg_ctx.url().unwrap_or_else(|_| "");
let data_size = match &target {
TargetSession::Single(_) => data.len(),
TargetSession::Multi(sessions) => data.len().saturating_mul(sessions.len()),
_ => {
log::warn!("filter blocked return target other than single and multi");
data.len()
}
};
common_apm::metrics::network::NETWORK_MESSAGE_SIZE_COUNT_VEC
.with_label_values(&["send", url])
.inc_by(data_size as i64);

let seq = self.data_seq.fetch_add(1, Ordering::SeqCst);
log::debug!("seq {} data size {}", seq, data.len());

Expand Down Expand Up @@ -285,9 +301,11 @@ impl<'a> SendingContext<'a> {
peer_ids: Vec<PeerId>,
data: Bytes,
priority: Priority,
msg_ctx: MessageContext,
) -> Result<(), NetworkError> {
let (connected, unconnected) = self.sessions.peers(peer_ids);
let send_ret = self.send_to_sessions(TargetSession::Multi(connected), data, priority);
let send_ret =
self.send_to_sessions(TargetSession::Multi(connected), data, priority, msg_ctx);
if unconnected.is_empty() {
return send_ret;
}
Expand Down
3 changes: 2 additions & 1 deletion core/network/src/protocols/transmitter/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::{Buf, BufMut};
use protocol::traits::Priority;
use protocol::traits::{Context, Priority};
use protocol::{Bytes, BytesMut};
use tentacle::secio::PeerId;
use tentacle::service::TargetSession;
Expand All @@ -14,6 +14,7 @@ pub struct TransmitterMessage {
pub recipient: Recipient,
pub priority: Priority,
pub data: Bytes,
pub ctx: Context, // For metric
}

pub struct ReceivedMessage {
Expand Down
5 changes: 5 additions & 0 deletions core/network/src/reactor/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ where
Arc::clone(&self.rpc_map),
self.trust_tx.clone(),
);
let raw_data_size = recv_msg.data.len();

async move {
let network_message = {
Expand All @@ -151,6 +152,10 @@ where
common_apm::metrics::network::on_network_message_received(&network_message.url);

let endpoint = network_message.url.parse::<Endpoint>()?;
common_apm::metrics::network::NETWORK_MESSAGE_SIZE_COUNT_VEC
.with_label_values(&["received", endpoint.full_url()])
.inc_by(raw_data_size as i64);

let reactor = {
let opt_reactor = reactor_map.read().get(&endpoint).cloned();
opt_reactor
Expand Down
13 changes: 13 additions & 0 deletions core/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub trait NetworkContext: Sized {
fn set_remote_connected_addr(&mut self, addr: ConnectedAddr) -> Self;
fn rpc_id(&self) -> Result<u64, NetworkError>;
fn set_rpc_id(&mut self, rid: u64) -> Self;
fn url(&self) -> Result<&str, ()>;
fn set_url(&mut self, url: String) -> Self;
}

pub trait ListenExchangeManager {
Expand Down Expand Up @@ -102,4 +104,15 @@ impl NetworkContext for Context {
fn set_rpc_id(&mut self, rid: u64) -> Self {
self.with_value::<CtxRpcId>("rpc_id", CtxRpcId(rid))
}

fn url(&self) -> Result<&str, ()> {
self.get::<String>("url")
.map(String::as_str)
.ok_or_else(|| ())
}

#[must_use]
fn set_url(&mut self, url: String) -> Self {
self.with_value::<String>("url", url)
}
}