From 17e44aa44e6f88c4c90d30a7ad83728a90d71fcf Mon Sep 17 00:00:00 2001 From: Stanimal Date: Thu, 19 Aug 2021 14:02:48 +0400 Subject: [PATCH] fix: bug in wallet base node peer switching - Connectivity retries connecting indefinitely, however previously if this continuously fails and the user updates their peer, the new peer will not be read until the previous peer was connected to. This PR fixes this. - Add protocl information to comms RPC logs - Cleanup peer state, making the wallet connectivity service the source of truth for the base node peer - Adds an `Ack` flag to RPC protocol, this is not currently used but could be implemented in the client side in future if required without breaking the network (server supports it, client support may or may not be needed). --- applications/daily_tests/helpers.js | 2 +- base_layer/core/src/base_node/rpc/service.rs | 2 +- .../state_machine_service/states/listening.rs | 5 +- .../mock_base_node_service.rs | 6 +- .../wallet/src/base_node_service/monitor.rs | 32 +++++----- .../wallet/src/base_node_service/service.rs | 18 +----- .../wallet/src/connectivity_service/handle.rs | 21 ++++-- .../src/connectivity_service/service.rs | 28 ++++---- .../wallet/src/connectivity_service/test.rs | 13 ++-- comms/rpc_macros/src/generator.rs | 3 +- .../src/connection_manager/peer_connection.rs | 8 +-- comms/src/protocol/rpc/client.rs | 64 +++++++++++++++---- comms/src/protocol/rpc/message.rs | 7 ++ comms/src/protocol/rpc/server/mod.rs | 42 ++++++++++-- .../src/protocol/rpc/test/greeting_service.rs | 4 +- comms/tests/rpc_stress.rs | 26 +++----- 16 files changed, 174 insertions(+), 107 deletions(-) diff --git a/applications/daily_tests/helpers.js b/applications/daily_tests/helpers.js index 9cdd81ba3f..5ebe183f5f 100644 --- a/applications/daily_tests/helpers.js +++ b/applications/daily_tests/helpers.js @@ -39,7 +39,7 @@ const yargs = () => require("yargs")(hideBin(process.argv)); function sendWebhookNotification(channel, message, webhookUrlOverride = null) { const hook = webhookUrlOverride || getWebhookUrlFromEnv(); if (!hook) { - throw new Error("WEBHOOK_URL not specified"); + return; } const data = JSON.stringify({ channel, text: message }); const args = ` -i -X POST -H 'Content-Type: application/json' -d '${data}' ${hook}`; diff --git a/base_layer/core/src/base_node/rpc/service.rs b/base_layer/core/src/base_node/rpc/service.rs index d82be66a8f..c50600ea9c 100644 --- a/base_layer/core/src/base_node/rpc/service.rs +++ b/base_layer/core/src/base_node/rpc/service.rs @@ -312,7 +312,7 @@ impl BaseNodeWalletService for BaseNodeWalletRpc async fn get_tip_info(&self, _request: Request<()>) -> Result, RpcStatus> { let state_machine = self.state_machine(); let status_watch = state_machine.get_status_info_watch(); - let is_synced = match (*status_watch.borrow()).state_info { + let is_synced = match status_watch.borrow().state_info { StateInfo::Listening(li) => li.is_synced(), _ => false, }; diff --git a/base_layer/core/src/base_node/state_machine_service/states/listening.rs b/base_layer/core/src/base_node/state_machine_service/states/listening.rs index e484dd5f3c..0ea8157568 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/listening.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/listening.rs @@ -152,8 +152,9 @@ impl Listening { ); if !self.is_synced { + debug!(target: LOG_TARGET, "Initial sync achieved"); self.is_synced = true; - shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced))); + shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true))); } continue; } @@ -222,7 +223,7 @@ impl Listening { impl From for Listening { fn from(_: Waiting) -> Self { - Default::default() + Self { is_synced: false } } } diff --git a/base_layer/wallet/src/base_node_service/mock_base_node_service.rs b/base_layer/wallet/src/base_node_service/mock_base_node_service.rs index 99b114c3c9..1bc57ed9d2 100644 --- a/base_layer/wallet/src/base_node_service/mock_base_node_service.rs +++ b/base_layer/wallet/src/base_node_service/mock_base_node_service.rs @@ -94,7 +94,6 @@ impl MockBaseNodeService { updated: None, latency: None, online, - base_node_peer: self.state.base_node_peer.clone(), } } @@ -106,12 +105,11 @@ impl MockBaseNodeService { updated: None, latency: None, online: OnlineStatus::Online, - base_node_peer: None, } } fn set_base_node_peer(&mut self, peer: Peer) { - self.state.base_node_peer = Some(peer); + self.base_node_peer = Some(peer); } /// This handler is called when requests arrive from the various streams @@ -125,7 +123,7 @@ impl MockBaseNodeService { Ok(BaseNodeServiceResponse::BaseNodePeerSet) }, BaseNodeServiceRequest::GetBaseNodePeer => { - let peer = self.state.base_node_peer.clone(); + let peer = self.base_node_peer.clone(); Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new))) }, BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata( diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 5153557ed4..a1005bb6c7 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -101,10 +101,8 @@ impl BaseNodeMonitor { loop { use OnlineStatus::*; match watcher.recv().await.unwrap_or(Offline) { - Online => match self.wallet_connectivity.get_current_base_node() { - Some(peer) => { - return peer; - }, + Online => match self.wallet_connectivity.get_current_base_node_id() { + Some(node_id) => return node_id, _ => continue, }, Connecting => { @@ -126,15 +124,8 @@ impl BaseNodeMonitor { .await .ok_or(BaseNodeMonitorError::NodeShuttingDown)?; let latency = client.get_last_request_latency().await?; - debug!( - target: LOG_TARGET, - "Base node {} latency: {} ms", - peer_node_id, - latency.unwrap_or_default().as_millis() - ); let tip_info = client.get_tip_info().await?; - let is_synced = tip_info.is_synced; let chain_metadata = tip_info .metadata @@ -143,15 +134,24 @@ impl BaseNodeMonitor { ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse) })?; + let is_synced = tip_info.is_synced; + debug!( + target: LOG_TARGET, + "Base node {} Tip: {} ({}) Latency: {} ms", + peer_node_id, + chain_metadata.height_of_longest_chain(), + if is_synced { "Synced" } else { "Syncing..." }, + latency.unwrap_or_default().as_millis() + ); + self.db.set_chain_metadata(chain_metadata.clone()).await?; - self.map_state(move |state| BaseNodeState { + self.map_state(move |_| BaseNodeState { chain_metadata: Some(chain_metadata), is_synced: Some(is_synced), updated: Some(Utc::now().naive_utc()), latency, online: OnlineStatus::Online, - base_node_peer: state.base_node_peer.clone(), }) .await; @@ -164,25 +164,23 @@ impl BaseNodeMonitor { } async fn set_connecting(&self) { - self.map_state(|state| BaseNodeState { + self.map_state(|_| BaseNodeState { chain_metadata: None, is_synced: None, updated: Some(Utc::now().naive_utc()), latency: None, online: OnlineStatus::Connecting, - base_node_peer: state.base_node_peer.clone(), }) .await; } async fn set_offline(&self) { - self.map_state(|state| BaseNodeState { + self.map_state(|_| BaseNodeState { chain_metadata: None, is_synced: None, updated: Some(Utc::now().naive_utc()), latency: None, online: OnlineStatus::Offline, - base_node_peer: state.base_node_peer.clone(), }) .await; } diff --git a/base_layer/wallet/src/base_node_service/service.rs b/base_layer/wallet/src/base_node_service/service.rs index 9aba348223..3da987c8b1 100644 --- a/base_layer/wallet/src/base_node_service/service.rs +++ b/base_layer/wallet/src/base_node_service/service.rs @@ -50,7 +50,7 @@ pub struct BaseNodeState { pub updated: Option, pub latency: Option, pub online: OnlineStatus, - pub base_node_peer: Option, + // pub base_node_peer: Option, } impl Default for BaseNodeState { @@ -61,7 +61,6 @@ impl Default for BaseNodeState { updated: None, latency: None, online: OnlineStatus::Connecting, - base_node_peer: None, } } } @@ -158,18 +157,7 @@ where T: WalletBackend + 'static } async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> { - let new_state = BaseNodeState { - base_node_peer: Some(peer.clone()), - ..Default::default() - }; - - { - let mut lock = self.state.write().await; - *lock = new_state.clone(); - } - self.wallet_connectivity.set_base_node(peer.node_id.clone()).await?; - - self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state)); + self.wallet_connectivity.set_base_node(peer.clone()).await?; self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer))); Ok(()) } @@ -189,7 +177,7 @@ where T: WalletBackend + 'static Ok(BaseNodeServiceResponse::BaseNodePeerSet) }, BaseNodeServiceRequest::GetBaseNodePeer => { - let peer = self.get_state().await.base_node_peer.map(Box::new); + let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new); Ok(BaseNodeServiceResponse::BaseNodePeer(peer)) }, BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() { diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index d596c543db..a43fea5333 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -26,27 +26,30 @@ use futures::{ channel::{mpsc, oneshot}, SinkExt, }; -use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease}; +use tari_comms::{ + peer_manager::{NodeId, Peer}, + protocol::rpc::RpcClientLease, +}; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; use tokio::sync::watch; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), ObtainBaseNodeSyncRpcClient(oneshot::Sender>), - SetBaseNode(NodeId), + SetBaseNode(Box), } #[derive(Clone)] pub struct WalletConnectivityHandle { sender: mpsc::Sender, - base_node_watch_rx: watch::Receiver>, + base_node_watch_rx: watch::Receiver>, online_status_rx: watch::Receiver, } impl WalletConnectivityHandle { pub(super) fn new( sender: mpsc::Sender, - base_node_watch_rx: watch::Receiver>, + base_node_watch_rx: watch::Receiver>, online_status_rx: watch::Receiver, ) -> Self { Self { @@ -56,9 +59,9 @@ impl WalletConnectivityHandle { } } - pub async fn set_base_node(&mut self, base_node_peer: NodeId) -> Result<(), WalletConnectivityError> { + pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> { self.sender - .send(WalletConnectivityRequest::SetBaseNode(base_node_peer)) + .send(WalletConnectivityRequest::SetBaseNode(Box::new(base_node_peer))) .await?; Ok(()) } @@ -110,7 +113,11 @@ impl WalletConnectivityHandle { self.online_status_rx.clone() } - pub fn get_current_base_node(&self) -> Option { + pub fn get_current_base_node_peer(&self) -> Option { self.base_node_watch_rx.borrow().clone() } + + pub fn get_current_base_node_id(&self) -> Option { + self.base_node_watch_rx.borrow().as_ref().map(|p| p.node_id.clone()) + } } diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index f71c1ae598..373dd069a1 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -33,7 +33,7 @@ use futures::{ use log::*; use tari_comms::{ connectivity::ConnectivityRequester, - peer_manager::NodeId, + peer_manager::{NodeId, Peer}, protocol::rpc::{RpcClientLease, RpcClientPool}, }; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; @@ -53,7 +53,7 @@ pub struct WalletConnectivityService { config: BaseNodeServiceConfig, request_stream: Fuse>, connectivity: ConnectivityRequester, - base_node_watch: Watch>, + base_node_watch: Watch>, pools: Option, online_status_watch: Watch, pending_requests: Vec, @@ -68,7 +68,7 @@ impl WalletConnectivityService { pub(super) fn new( config: BaseNodeServiceConfig, request_stream: mpsc::Receiver, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_watch: Watch, connectivity: ConnectivityRequester, ) -> Self { @@ -91,10 +91,10 @@ impl WalletConnectivityService { req = self.request_stream.select_next_some() => { self.handle_request(req).await; }, - peer = base_node_watch_rx.select_next_some() => { - if let Some(peer) = peer { + maybe_peer = base_node_watch_rx.select_next_some() => { + if maybe_peer.is_some() { // This will block the rest until the connection is established. This is what we want. - self.setup_base_node_connection(peer).await; + self.setup_base_node_connection().await; } } } @@ -112,7 +112,7 @@ impl WalletConnectivityService { }, SetBaseNode(peer) => { - self.set_base_node_peer(peer); + self.set_base_node_peer(*peer); }, } } @@ -197,25 +197,29 @@ impl WalletConnectivityService { self.set_base_node_peer(peer); } - fn set_base_node_peer(&mut self, peer: NodeId) { + fn set_base_node_peer(&mut self, peer: Peer) { self.pools = None; self.base_node_watch.broadcast(Some(peer)); } - async fn setup_base_node_connection(&mut self, peer: NodeId) { + async fn setup_base_node_connection(&mut self) { self.pools = None; loop { + let node_id = match self.base_node_watch.borrow().as_ref() { + Some(p) => p.node_id.clone(), + None => return, + }; debug!( target: LOG_TARGET, - "Attempting to connect to base node peer {}...", peer + "Attempting to connect to base node peer {}...", node_id ); self.set_online_status(OnlineStatus::Connecting); - match self.try_setup_rpc_pool(peer.clone()).await { + match self.try_setup_rpc_pool(node_id.clone()).await { Ok(_) => { self.set_online_status(OnlineStatus::Online); debug!( target: LOG_TARGET, - "Wallet is ONLINE and connected to base node {}", peer + "Wallet is ONLINE and connected to base node {}", node_id ); break; }, diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index 35ca5ee95d..a2092453e9 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -53,6 +53,7 @@ async fn setup() -> ( let handle = WalletConnectivityHandle::new(tx, base_node_watch.get_receiver(), online_status_watch.get_receiver()); let (connectivity, mock) = create_connectivity_mock(); let mock_state = mock.spawn(); + // let peer_manager = create_peer_manager(tempdir().unwrap()); let service = WalletConnectivityService::new( Default::default(), rx, @@ -78,7 +79,7 @@ async fn it_dials_peer_when_base_node_is_set() { // Set the mock to defer returning a result for the peer connection mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); // Wait for connection request mock_state.await_call_count(1).await; @@ -101,7 +102,7 @@ async fn it_resolves_many_pending_rpc_session_requests() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); let pending_requests = iter::repeat_with(|| { let mut handle = handle.clone(); @@ -133,7 +134,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.add_active_connection(conn2).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer1.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer1.to_peer()).await.unwrap(); mock_state.await_call_count(2).await; mock_state.expect_dial_peer(base_node_peer1.node_id()).await; @@ -144,7 +145,7 @@ async fn it_changes_to_a_new_base_node() { assert!(rpc_client.is_connected()); // Initiate a connection to the base node - handle.set_base_node(base_node_peer2.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer2.to_peer()).await.unwrap(); mock_state.await_call_count(2).await; mock_state.expect_dial_peer(base_node_peer2.node_id()).await; @@ -164,7 +165,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; @@ -204,7 +205,7 @@ async fn it_gracefully_handles_multiple_connection_failures() { let conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; diff --git a/comms/rpc_macros/src/generator.rs b/comms/rpc_macros/src/generator.rs index b02b702edb..bb8c05a53b 100644 --- a/comms/rpc_macros/src/generator.rs +++ b/comms/rpc_macros/src/generator.rs @@ -196,7 +196,8 @@ impl RpcCodeGenerator { let client_struct_body = quote! { pub async fn connect(framed: #dep_mod::CanonicalFraming) -> Result where TSubstream: #dep_mod::AsyncRead + #dep_mod::AsyncWrite + Unpin + Send + 'static { - let inner = #dep_mod::RpcClient::connect(Default::default(), framed).await?; + use #dep_mod::NamedProtocolService; + let inner = #dep_mod::RpcClient::connect(Default::default(), framed, Self::PROTOCOL_NAME.into()).await?; Ok(Self { inner }) } diff --git a/comms/src/connection_manager/peer_connection.rs b/comms/src/connection_manager/peer_connection.rs index 3e15b50bb0..260befaeee 100644 --- a/comms/src/connection_manager/peer_connection.rs +++ b/comms/src/connection_manager/peer_connection.rs @@ -219,15 +219,15 @@ impl PeerConnection { #[cfg(feature = "rpc")] pub async fn connect_rpc_using_builder(&mut self, builder: RpcClientBuilder) -> Result where T: From + NamedProtocolService { - let protocol = T::PROTOCOL_NAME; + let protocol = ProtocolId::from_static(T::PROTOCOL_NAME); debug!( target: LOG_TARGET, "Attempting to establish RPC protocol `{}` to peer `{}`", - String::from_utf8_lossy(protocol), + String::from_utf8_lossy(&protocol), self.peer_node_id ); - let framed = self.open_framed_substream(&protocol.into(), RPC_MAX_FRAME_SIZE).await?; - builder.connect(framed).await + let framed = self.open_framed_substream(&protocol, RPC_MAX_FRAME_SIZE).await?; + builder.with_protocol_id(protocol).connect(framed).await } /// Creates a new RpcClientPool that can be shared between tasks. The client pool will lazily establish up to diff --git a/comms/src/protocol/rpc/client.rs b/comms/src/protocol/rpc/client.rs index 5ed6a040d8..737255bbd5 100644 --- a/comms/src/protocol/rpc/client.rs +++ b/comms/src/protocol/rpc/client.rs @@ -25,14 +25,17 @@ use crate::{ framing::CanonicalFraming, message::MessageExt, proto, - protocol::rpc::{ - body::ClientStreaming, - message::BaseRequest, - Handshake, - NamedProtocolService, - Response, - RpcError, - RpcStatus, + protocol::{ + rpc::{ + body::ClientStreaming, + message::BaseRequest, + Handshake, + NamedProtocolService, + Response, + RpcError, + RpcStatus, + }, + ProtocolId, }, runtime::task, }; @@ -50,6 +53,7 @@ use futures::{ use log::*; use prost::Message; use std::{ + borrow::Cow, convert::TryFrom, fmt, future::Future, @@ -71,6 +75,7 @@ impl RpcClient { pub async fn connect( config: RpcClientConfig, framed: CanonicalFraming, + protocol_name: ProtocolId, ) -> Result where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -78,7 +83,7 @@ impl RpcClient { let (request_tx, request_rx) = mpsc::channel(1); let connector = ClientConnector::new(request_tx); let (ready_tx, ready_rx) = oneshot::channel(); - task::spawn(RpcClientWorker::new(config, request_rx, framed, ready_tx).run()); + task::spawn(RpcClientWorker::new(config, request_rx, framed, ready_tx, protocol_name).run()); ready_rx .await .expect("ready_rx oneshot is never dropped without a reply")?; @@ -150,6 +155,7 @@ impl fmt::Debug for RpcClient { #[derive(Debug, Clone)] pub struct RpcClientBuilder { config: RpcClientConfig, + protocol_id: Option, _client: PhantomData, } @@ -157,6 +163,7 @@ impl Default for RpcClientBuilder { fn default() -> Self { Self { config: Default::default(), + protocol_id: None, _client: PhantomData, } } @@ -198,10 +205,21 @@ where TClient: From + NamedProtocolService self } + pub(crate) fn with_protocol_id(mut self, protocol_id: ProtocolId) -> Self { + self.protocol_id = Some(protocol_id); + self + } + /// Negotiates and establishes a session to the peer's RPC service pub async fn connect(self, framed: CanonicalFraming) -> Result where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static { - RpcClient::connect(self.config, framed).await.map(Into::into) + RpcClient::connect( + self.config, + framed, + self.protocol_id.as_ref().cloned().unwrap_or_default(), + ) + .await + .map(Into::into) } } @@ -302,6 +320,7 @@ pub struct RpcClientWorker { next_request_id: u16, ready_tx: Option>>, latency: Option, + protocol_id: ProtocolId, } impl RpcClientWorker @@ -312,6 +331,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send request_rx: mpsc::Receiver, framed: CanonicalFraming, ready_tx: oneshot::Sender>, + protocol_id: ProtocolId, ) -> Self { Self { config, @@ -320,11 +340,20 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send next_request_id: 0, ready_tx: Some(ready_tx), latency: None, + protocol_id, } } + fn protocol_name(&self) -> Cow<'_, str> { + String::from_utf8_lossy(&self.protocol_id) + } + async fn run(mut self) { - debug!(target: LOG_TARGET, "Performing client handshake"); + debug!( + target: LOG_TARGET, + "Performing client handshake for '{}'", + self.protocol_name() + ); let start = Instant::now(); let mut handshake = Handshake::new(&mut self.framed).with_timeout(self.config.handshake_timeout()); match handshake.perform_client_handshake().await { @@ -332,7 +361,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let latency = start.elapsed(); debug!( target: LOG_TARGET, - "RPC Session negotiation completed. Latency: {:.0?}", latency + "RPC Session ({}) negotiation completed. Latency: {:.0?}", + self.protocol_name(), + latency ); self.latency = Some(latency); if let Some(r) = self.ready_tx.take() { @@ -366,7 +397,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send debug!(target: LOG_TARGET, "IO Error when closing substream: {}", err); } - debug!(target: LOG_TARGET, "RpcClientWorker terminated."); + debug!( + target: LOG_TARGET, + "RpcClientWorker ({}) terminated.", + self.protocol_name() + ); } async fn do_request_response( @@ -407,9 +442,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let latency = start.elapsed(); trace!( target: LOG_TARGET, - "Received response ({} byte(s)) from request #{} (method={}) in {:.0?}", + "Received response ({} byte(s)) from request #{} (protocol = {}, method={}) in {:.0?}", resp.len(), request_id, + self.protocol_name(), method, latency ); diff --git a/comms/src/protocol/rpc/message.rs b/comms/src/protocol/rpc/message.rs index 2e963b3d2d..dedd7e04fb 100644 --- a/comms/src/protocol/rpc/message.rs +++ b/comms/src/protocol/rpc/message.rs @@ -197,13 +197,20 @@ impl Into for RpcMethod { bitflags! { pub struct RpcMessageFlags: u8 { + /// Message stream has completed const FIN = 0x01; + /// Typically sent with empty contents and used to confirm a substream is alive. + const ACK = 0x02; } } impl RpcMessageFlags { pub fn is_fin(&self) -> bool { self.contains(Self::FIN) } + + pub fn is_ack(&self) -> bool { + self.contains(Self::ACK) + } } impl Default for RpcMessageFlags { diff --git a/comms/src/protocol/rpc/server/mod.rs b/comms/src/protocol/rpc/server/mod.rs index 092120ee48..5d775a5314 100644 --- a/comms/src/protocol/rpc/server/mod.rs +++ b/comms/src/protocol/rpc/server/mod.rs @@ -57,6 +57,7 @@ use futures::{channel::mpsc, AsyncRead, AsyncWrite, SinkExt, StreamExt}; use log::*; use prost::Message; use std::{ + borrow::Cow, future::Future, time::{Duration, Instant}, }; @@ -356,6 +357,7 @@ where let service = ActivePeerRpcService { config: self.config.clone(), + protocol, node_id: node_id.clone(), framed, service, @@ -372,6 +374,7 @@ where struct ActivePeerRpcService { config: RpcServerBuilder, + protocol: ProtocolId, node_id: NodeId, service: TSvc, framed: CanonicalFraming, @@ -385,14 +388,31 @@ where TCommsProvider: RpcCommsProvider + Send + Clone + 'static, { async fn start(mut self) { - debug!(target: LOG_TARGET, "(Peer = `{}`) Rpc server started.", self.node_id); + debug!( + target: LOG_TARGET, + "(Peer = `{}`) Rpc server ({}) started.", + self.node_id, + self.protocol_name() + ); if let Err(err) = self.run().await { error!( target: LOG_TARGET, - "(Peer = `{}`) Rpc server exited with an error: {}", self.node_id, err + "(Peer = `{}`) Rpc server ({}) exited with an error: {}", + self.node_id, + self.protocol_name(), + err ); } - debug!(target: LOG_TARGET, "(Peer = {}) Rpc service shutdown", self.node_id); + debug!( + target: LOG_TARGET, + "(Peer = {}) Rpc service ({}) shutdown", + self.node_id, + self.protocol_name() + ); + } + + fn protocol_name(&self) -> Cow<'_, str> { + String::from_utf8_lossy(&self.protocol) } async fn run(&mut self) -> Result<(), RpcServerError> { @@ -405,7 +425,8 @@ where let elapsed = start.elapsed(); debug!( target: LOG_TARGET, - "RPC request completed in {:.0?}{}", + "RPC ({}) request completed in {:.0?}{}", + self.protocol_name(), elapsed, if elapsed.as_secs() > 5 { " (LONG REQUEST)" } else { "" } ); @@ -448,6 +469,19 @@ where "[Peer=`{}`] Got request {}", self.node_id, decoded_msg ); + let msg_flags = RpcMessageFlags::from_bits_truncate(decoded_msg.flags as u8); + if msg_flags.contains(RpcMessageFlags::ACK) { + debug!(target: LOG_TARGET, "[Peer=`{}`] ACK.", self.node_id); + let ack = proto::rpc::RpcResponse { + request_id, + status: RpcStatus::ok().as_code(), + flags: RpcMessageFlags::ACK.bits().into(), + ..Default::default() + }; + self.framed.send(ack.to_encoded_bytes().into()).await?; + return Ok(()); + } + let req = Request::with_context( self.create_request_context(request_id), method, diff --git a/comms/src/protocol/rpc/test/greeting_service.rs b/comms/src/protocol/rpc/test/greeting_service.rs index 463057aed4..516ef230a1 100644 --- a/comms/src/protocol/rpc/test/greeting_service.rs +++ b/comms/src/protocol/rpc/test/greeting_service.rs @@ -23,7 +23,7 @@ use crate::{ async_trait, protocol::{ - rpc::{Request, Response, RpcError, RpcServerError, RpcStatus, Streaming}, + rpc::{NamedProtocolService, Request, Response, RpcError, RpcServerError, RpcStatus, Streaming}, ProtocolId, }, }; @@ -332,7 +332,7 @@ impl __rpc_deps::NamedProtocolService for GreetingClient { impl GreetingClient { pub async fn connect(framed: __rpc_deps::CanonicalFraming) -> Result where TSubstream: __rpc_deps::AsyncRead + __rpc_deps::AsyncWrite + Unpin + Send + 'static { - let inner = __rpc_deps::RpcClient::connect(Default::default(), framed).await?; + let inner = __rpc_deps::RpcClient::connect(Default::default(), framed, Self::PROTOCOL_NAME.into()).await?; Ok(Self { inner }) } diff --git a/comms/tests/rpc_stress.rs b/comms/tests/rpc_stress.rs index 24d124d1ee..0376a7dddc 100644 --- a/comms/tests/rpc_stress.rs +++ b/comms/tests/rpc_stress.rs @@ -31,7 +31,7 @@ mod helpers; use helpers::create_comms; use futures::{future, StreamExt}; -use std::{env, future::Future, time::Duration}; +use std::{future::Future, time::Duration}; use tari_comms::{ protocol::rpc::{RpcClientBuilder, RpcServer}, transports::TcpTransport, @@ -211,6 +211,7 @@ async fn few_large_messages() { .await; } +#[allow(dead_code)] async fn payload_limit() { run_stress_test(Params { num_tasks: 50, @@ -222,6 +223,7 @@ async fn payload_limit() { .await; } +#[allow(dead_code)] async fn high_contention() { run_stress_test(Params { num_tasks: 1000, @@ -233,6 +235,7 @@ async fn high_contention() { .await; } +#[allow(dead_code)] async fn high_concurrency() { run_stress_test(Params { num_tasks: 1000, @@ -244,6 +247,7 @@ async fn high_concurrency() { .await; } +#[allow(dead_code)] async fn high_contention_high_concurrency() { run_stress_test(Params { num_tasks: 2000, @@ -255,29 +259,17 @@ async fn high_contention_high_concurrency() { .await; } -#[tokio_macros::test] -async fn run_ci() { - log_timing("quick", quick()).await; - log_timing("basic", basic()).await; - log_timing("many_small_messages", many_small_messages()).await; - log_timing("few_large_messages", few_large_messages()).await; -} - #[tokio_macros::test] async fn run() { - if env::var("CI").is_ok() { - println!("Skipping the stress test on CI"); - return; - } // let _ = env_logger::try_init(); log_timing("quick", quick()).await; log_timing("basic", basic()).await; log_timing("many_small_messages", many_small_messages()).await; log_timing("few_large_messages", few_large_messages()).await; - log_timing("payload_limit", payload_limit()).await; - log_timing("high_contention", high_contention()).await; - log_timing("high_concurrency", high_concurrency()).await; - log_timing("high_contention_high_concurrency", high_contention_high_concurrency()).await; + // log_timing("payload_limit", payload_limit()).await; + // log_timing("high_contention", high_contention()).await; + // log_timing("high_concurrency", high_concurrency()).await; + // log_timing("high_contention_high_concurrency", high_contention_high_concurrency()).await; } async fn log_timing>(name: &str, fut: F) -> R {