diff --git a/comms/core/src/protocol/rpc/client/mod.rs b/comms/core/src/protocol/rpc/client/mod.rs index 257905bf64..e30d3a70b0 100644 --- a/comms/core/src/protocol/rpc/client/mod.rs +++ b/comms/core/src/protocol/rpc/client/mod.rs @@ -613,7 +613,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId debug!(target: LOG_TARGET, "Sending request: {}", req); - let mut timer = Some(Instant::now()); if reply.is_closed() { event!(Level::WARN, "Client request was cancelled before request was sent"); warn!( @@ -637,12 +636,14 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId let latency = metrics::request_response_latency(&self.node_id, &self.protocol_id); let mut metrics_timer = Some(latency.start_timer()); + let timer = Instant::now(); if let Err(err) = self.send_request(req).await { warn!(target: LOG_TARGET, "{}", err); metrics::client_errors(&self.node_id, &self.protocol_id).inc(); let _result = response_tx.send(Err(err.into())).await; return Ok(()); } + let partial_latency = timer.elapsed(); loop { if self.shutdown_signal.is_triggered() { @@ -679,9 +680,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId // let resp = match self.read_response(request_id).await { let resp = match resp_result { - Ok(resp) => { - if let Some(t) = timer.take() { - let _ = self.last_request_latency_tx.send(Some(t.elapsed())); + Ok((resp, time_to_first_msg)) => { + if let Some(t) = time_to_first_msg { + let _ = self.last_request_latency_tx.send(Some(partial_latency + t)); } event!(Level::TRACE, "Message received"); trace!( @@ -804,7 +805,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId Ok(()) } - async fn read_response(&mut self, request_id: u16) -> Result { + async fn read_response( + &mut self, + request_id: u16, + ) -> Result<(proto::rpc::RpcResponse, Option), RpcError> { let stream_id = self.stream_id(); let protocol_name = self.protocol_name().to_string(); @@ -822,7 +826,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId ); metrics::inbound_response_bytes(&self.node_id, &self.protocol_id) .observe(reader.bytes_read() as f64); - break resp; + let time_to_first_msg = reader.time_to_first_msg(); + break (resp, time_to_first_msg); }, Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected }) if actual.wrapping_add(1) == request_id => @@ -888,6 +893,7 @@ struct RpcResponseReader<'a, TSubstream> { config: RpcClientConfig, request_id: u16, bytes_read: usize, + time_to_first_msg: Option, } impl<'a, TSubstream> RpcResponseReader<'a, TSubstream> @@ -899,6 +905,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin config, request_id, bytes_read: 0, + time_to_first_msg: None, } } @@ -906,8 +913,14 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin self.bytes_read } + pub fn time_to_first_msg(&self) -> Option { + self.time_to_first_msg + } + pub async fn read_response(&mut self) -> Result { + let timer = Instant::now(); let mut resp = self.next().await?; + self.time_to_first_msg = Some(timer.elapsed()); self.check_response(&resp)?; let mut chunk_count = 1; let mut last_chunk_flags = RpcMessageFlags::from_bits_truncate(u8::try_from(resp.flags).unwrap());