Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat(metric): add accumulated network message size count (#449)
Browse files Browse the repository at this point in the history
* feat(metric): add network outbound accumulated message size

* feat(metric): add network received accumulated message size

* fix(network): compilation errors

* fix(network): message size should be multipled by target's length

* fix: compilation error

* fix(metric): NETWORK_MESSAGE_SIZE_COUNT_VEC arguments number
  • Loading branch information
zeroqn authored Sep 10, 2020
1 parent 5e1e4b6 commit eda8f75
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 19 deletions.
6 changes: 6 additions & 0 deletions common/apm/src/metrics/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ lazy_static! {
&["direction", "target", "type", "module", "action"]
)
.expect("network message total");
pub static ref NETWORK_MESSAGE_SIZE_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"muta_network_message_size",
"Accumulated compressed network message size",
&["direction", "url"]
)
.expect("network message size");
pub static ref NETWORK_RPC_RESULT_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"muta_network_rpc_result_total",
"Total number of network rpc result",
Expand Down
18 changes: 11 additions & 7 deletions core/network/src/outbound/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::endpoint::Endpoint;
use crate::error::NetworkError;
use crate::message::{Headers, NetworkMessage};
use crate::protocols::{Recipient, Transmitter, TransmitterMessage};
use crate::traits::Compression;
use crate::traits::{Compression, NetworkContext};
use crate::PeerIdExt;

#[derive(Clone)]
Expand Down Expand Up @@ -46,7 +46,7 @@ impl NetworkGossip {

async fn send_to_sessions(
&self,
_ctx: Context,
ctx: Context,
target_session: TargetSession,
data: Bytes,
priority: Priority,
Expand All @@ -55,14 +55,15 @@ impl NetworkGossip {
recipient: Recipient::Session(target_session),
priority,
data,
ctx,
};

self.transmitter.behaviour.send(msg).await
}

async fn send_to_peers<'a, P: AsRef<[Bytes]> + 'a>(
&self,
_ctx: Context,
ctx: Context,
peer_ids: P,
data: Bytes,
priority: Priority,
Expand All @@ -78,6 +79,7 @@ impl NetworkGossip {
recipient: Recipient::PeerId(peer_ids),
priority,
data,
ctx,
};

self.transmitter.behaviour.send(msg).await
Expand All @@ -88,7 +90,7 @@ impl NetworkGossip {
impl Gossip for NetworkGossip {
async fn broadcast<M>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
msg: M,
priority: Priority,
Expand All @@ -97,15 +99,16 @@ impl Gossip for NetworkGossip {
M: MessageCodec,
{
let msg = self.package_message(cx.clone(), endpoint, msg).await?;
self.send_to_sessions(cx, TargetSession::All, msg, priority)
let ctx = cx.set_url(endpoint.to_owned());
self.send_to_sessions(ctx, TargetSession::All, msg, priority)
.await?;
common_apm::metrics::network::on_network_message_sent_all_target(endpoint);
Ok(())
}

async fn multicast<'a, M, P>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
peer_ids: P,
msg: M,
Expand All @@ -118,7 +121,8 @@ impl Gossip for NetworkGossip {
let msg = self.package_message(cx.clone(), endpoint, msg).await?;
let multicast_count = peer_ids.as_ref().len();

self.send_to_peers(cx, peer_ids, msg, priority).await?;
let ctx = cx.set_url(endpoint.to_owned());
self.send_to_peers(ctx, peer_ids, msg, priority).await?;
common_apm::metrics::network::on_network_message_sent_multi_target(
endpoint,
multicast_count as i64,
Expand Down
17 changes: 10 additions & 7 deletions core/network/src/outbound/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl NetworkRpc {

async fn send(
&self,
_: Context,
ctx: Context,
session_id: SessionId,
data: Bytes,
priority: Priority,
Expand All @@ -43,6 +43,7 @@ impl NetworkRpc {
recipient: Recipient::Session(TargetSession::Single(session_id)),
priority,
data: compressed_data,
ctx,
};

self.transmitter.behaviour.send(msg).await
Expand All @@ -53,7 +54,7 @@ impl NetworkRpc {
impl Rpc for NetworkRpc {
async fn call<M, R>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
mut msg: M,
priority: Priority,
Expand Down Expand Up @@ -99,9 +100,10 @@ impl Rpc for NetworkRpc {
log::info!("no trace id found for rpc {}", endpoint.full_url());
}
common_apm::metrics::network::on_network_message_sent(endpoint.full_url());
let net_msg = NetworkMessage::new(endpoint, data, headers).encode()?;

self.send(cx, sid, net_msg, priority).await?;
let ctx = cx.set_url(endpoint.full_url().to_owned());
let net_msg = NetworkMessage::new(endpoint, data, headers).encode()?;
self.send(ctx, sid, net_msg, priority).await?;

let timeout = Delay::new(self.timeout.rpc);
let ret = match future::select(done_rx, timeout).await {
Expand Down Expand Up @@ -134,7 +136,7 @@ impl Rpc for NetworkRpc {

async fn response<M>(
&self,
cx: Context,
mut cx: Context,
endpoint: &str,
ret: ProtocolResult<M>,
priority: Priority,
Expand Down Expand Up @@ -163,9 +165,10 @@ impl Rpc for NetworkRpc {
log::info!("no trace id found for rpc {}", endpoint.full_url());
}
common_apm::metrics::network::on_network_message_sent(endpoint.full_url());
let net_msg = NetworkMessage::new(endpoint, encoded_resp, headers).encode()?;

self.send(cx, sid, net_msg, priority).await?;
let ctx = cx.set_url(endpoint.full_url().to_owned());
let net_msg = NetworkMessage::new(endpoint, encoded_resp, headers).encode()?;
self.send(ctx, sid, net_msg, priority).await?;

Ok(())
}
Expand Down
26 changes: 22 additions & 4 deletions core/network/src/protocols/transmitter/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::error::{ErrorKind, NetworkError};
use crate::event::PeerManagerEvent;
use crate::peer_manager::SharedSessions;
use crate::protocols::core::TRANSMITTER_PROTOCOL_ID;
use crate::traits::SharedSessionBook;
use crate::traits::{NetworkContext, SharedSessionBook};

// TODO: Refactor connection service, decouple protocol and service
// initialization.
Expand Down Expand Up @@ -150,6 +150,8 @@ impl Future for BackgroundSending {
}
}

type MessageContext = protocol::traits::Context;

struct SendingContext<'a> {
conn_ctrl: &'a ConnectionServiceControl,
peers_serv: &'a UnboundedSender<PeerManagerEvent>,
Expand All @@ -162,8 +164,8 @@ impl<'a> SendingContext<'a> {
let TransmitterMessage { priority, data, .. } = msg;

match msg.recipient {
Recipient::Session(target) => self.send_to_sessions(target, data, priority),
Recipient::PeerId(peer_ids) => self.send_to_peers(peer_ids, data, priority),
Recipient::Session(target) => self.send_to_sessions(target, data, priority, msg.ctx),
Recipient::PeerId(peer_ids) => self.send_to_peers(peer_ids, data, priority, msg.ctx),
}
}

Expand All @@ -172,6 +174,7 @@ impl<'a> SendingContext<'a> {
target: TargetSession,
mut data: Bytes,
priority: Priority,
msg_ctx: MessageContext,
) -> Result<(), NetworkError> {
let (target, opt_blocked) = match self.filter_blocked(target) {
(None, None) => unreachable!(),
Expand All @@ -184,6 +187,19 @@ impl<'a> SendingContext<'a> {
(Some(tar), opt_blocked) => (tar, opt_blocked),
};

let url = msg_ctx.url().unwrap_or_else(|_| "");
let data_size = match &target {
TargetSession::Single(_) => data.len(),
TargetSession::Multi(sessions) => data.len().saturating_mul(sessions.len()),
_ => {
log::warn!("filter blocked return target other than single and multi");
data.len()
}
};
common_apm::metrics::network::NETWORK_MESSAGE_SIZE_COUNT_VEC
.with_label_values(&["send", url])
.inc_by(data_size as i64);

let seq = self.data_seq.fetch_add(1, Ordering::SeqCst);
log::debug!("seq {} data size {}", seq, data.len());

Expand Down Expand Up @@ -285,9 +301,11 @@ impl<'a> SendingContext<'a> {
peer_ids: Vec<PeerId>,
data: Bytes,
priority: Priority,
msg_ctx: MessageContext,
) -> Result<(), NetworkError> {
let (connected, unconnected) = self.sessions.peers(peer_ids);
let send_ret = self.send_to_sessions(TargetSession::Multi(connected), data, priority);
let send_ret =
self.send_to_sessions(TargetSession::Multi(connected), data, priority, msg_ctx);
if unconnected.is_empty() {
return send_ret;
}
Expand Down
3 changes: 2 additions & 1 deletion core/network/src/protocols/transmitter/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::{Buf, BufMut};
use protocol::traits::Priority;
use protocol::traits::{Context, Priority};
use protocol::{Bytes, BytesMut};
use tentacle::secio::PeerId;
use tentacle::service::TargetSession;
Expand All @@ -14,6 +14,7 @@ pub struct TransmitterMessage {
pub recipient: Recipient,
pub priority: Priority,
pub data: Bytes,
pub ctx: Context, // For metric
}

pub struct ReceivedMessage {
Expand Down
5 changes: 5 additions & 0 deletions core/network/src/reactor/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ where
Arc::clone(&self.rpc_map),
self.trust_tx.clone(),
);
let raw_data_size = recv_msg.data.len();

async move {
let network_message = {
Expand All @@ -151,6 +152,10 @@ where
common_apm::metrics::network::on_network_message_received(&network_message.url);

let endpoint = network_message.url.parse::<Endpoint>()?;
common_apm::metrics::network::NETWORK_MESSAGE_SIZE_COUNT_VEC
.with_label_values(&["received", endpoint.full_url()])
.inc_by(raw_data_size as i64);

let reactor = {
let opt_reactor = reactor_map.read().get(&endpoint).cloned();
opt_reactor
Expand Down
13 changes: 13 additions & 0 deletions core/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub trait NetworkContext: Sized {
fn set_remote_connected_addr(&mut self, addr: ConnectedAddr) -> Self;
fn rpc_id(&self) -> Result<u64, NetworkError>;
fn set_rpc_id(&mut self, rid: u64) -> Self;
fn url(&self) -> Result<&str, ()>;
fn set_url(&mut self, url: String) -> Self;
}

pub trait ListenExchangeManager {
Expand Down Expand Up @@ -102,4 +104,15 @@ impl NetworkContext for Context {
fn set_rpc_id(&mut self, rid: u64) -> Self {
self.with_value::<CtxRpcId>("rpc_id", CtxRpcId(rid))
}

fn url(&self) -> Result<&str, ()> {
self.get::<String>("url")
.map(String::as_str)
.ok_or_else(|| ())
}

#[must_use]
fn set_url(&mut self, url: String) -> Self {
self.with_value::<String>("url", url)
}
}

0 comments on commit eda8f75

Please sign in to comment.