Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(comms/rpc): measures client-side latency to first message received #4817

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions comms/core/src/protocol/rpc/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId

debug!(target: LOG_TARGET, "Sending request: {}", req);

let mut timer = Some(Instant::now());
if reply.is_closed() {
event!(Level::WARN, "Client request was cancelled before request was sent");
warn!(
Expand All @@ -637,12 +636,14 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId

let latency = metrics::request_response_latency(&self.node_id, &self.protocol_id);
let mut metrics_timer = Some(latency.start_timer());
let timer = Instant::now();
if let Err(err) = self.send_request(req).await {
warn!(target: LOG_TARGET, "{}", err);
metrics::client_errors(&self.node_id, &self.protocol_id).inc();
let _result = response_tx.send(Err(err.into())).await;
return Ok(());
}
let partial_latency = timer.elapsed();

loop {
if self.shutdown_signal.is_triggered() {
Expand Down Expand Up @@ -679,9 +680,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId

// let resp = match self.read_response(request_id).await {
let resp = match resp_result {
Ok(resp) => {
if let Some(t) = timer.take() {
let _ = self.last_request_latency_tx.send(Some(t.elapsed()));
Ok((resp, time_to_first_msg)) => {
if let Some(t) = time_to_first_msg {
let _ = self.last_request_latency_tx.send(Some(partial_latency + t));
}
event!(Level::TRACE, "Message received");
trace!(
Expand Down Expand Up @@ -804,7 +805,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
Ok(())
}

async fn read_response(&mut self, request_id: u16) -> Result<proto::rpc::RpcResponse, RpcError> {
async fn read_response(
&mut self,
request_id: u16,
) -> Result<(proto::rpc::RpcResponse, Option<Duration>), RpcError> {
let stream_id = self.stream_id();
let protocol_name = self.protocol_name().to_string();

Expand All @@ -822,7 +826,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
);
metrics::inbound_response_bytes(&self.node_id, &self.protocol_id)
.observe(reader.bytes_read() as f64);
break resp;
let time_to_first_msg = reader.time_to_first_msg();
break (resp, time_to_first_msg);
},
Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected })
if actual.wrapping_add(1) == request_id =>
Expand Down Expand Up @@ -888,6 +893,7 @@ struct RpcResponseReader<'a, TSubstream> {
config: RpcClientConfig,
request_id: u16,
bytes_read: usize,
time_to_first_msg: Option<Duration>,
}

impl<'a, TSubstream> RpcResponseReader<'a, TSubstream>
Expand All @@ -899,15 +905,22 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin
config,
request_id,
bytes_read: 0,
time_to_first_msg: None,
}
}

pub fn bytes_read(&self) -> usize {
self.bytes_read
}

pub fn time_to_first_msg(&self) -> Option<Duration> {
self.time_to_first_msg
}

pub async fn read_response(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
let timer = Instant::now();
let mut resp = self.next().await?;
self.time_to_first_msg = Some(timer.elapsed());
self.check_response(&resp)?;
let mut chunk_count = 1;
let mut last_chunk_flags = RpcMessageFlags::from_bits_truncate(u8::try_from(resp.flags).unwrap());
Expand Down