Skip to content

Commit

Permalink
fix: bug in wallet base node peer switching
Browse files Browse the repository at this point in the history
- Connectivity retries connecting indefinitely, however previously if
  this continuously fails and the user updates their peer, the new peer will not be
  read until the previous peer was connected to. This PR fixes this.
- Add protocl information to comms RPC logs
- Cleanup peer state, making the wallet connectivity service the source
  of truth for the base node peer
- Adds an `Ack` flag to RPC protocol, this is not currently used but
  could be implemented in the client side in future if required without
  breaking the network (server supports it, client support may or may
  not be needed).
  • Loading branch information
sdbondi committed Aug 19, 2021
1 parent 8c28bd1 commit 0e0fc0e
Show file tree
Hide file tree
Showing 18 changed files with 193 additions and 110 deletions.
2 changes: 1 addition & 1 deletion applications/daily_tests/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const yargs = () => require("yargs")(hideBin(process.argv));
function sendWebhookNotification(channel, message, webhookUrlOverride = null) {
const hook = webhookUrlOverride || getWebhookUrlFromEnv();
if (!hook) {
throw new Error("WEBHOOK_URL not specified");
return;
}
const data = JSON.stringify({ channel, text: message });
const args = ` -i -X POST -H 'Content-Type: application/json' -d '${data}' ${hook}`;
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
async fn get_tip_info(&self, _request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus> {
let state_machine = self.state_machine();
let status_watch = state_machine.get_status_info_watch();
let is_synced = match (*status_watch.borrow()).state_info {
let is_synced = match status_watch.borrow().state_info {
StateInfo::Listening(li) => li.is_synced(),
_ => false,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ impl Listening {
);

if !self.is_synced {
debug!(target: LOG_TARGET, "Initial sync achieved");
self.is_synced = true;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced)));
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true)));
}
continue;
}
Expand Down Expand Up @@ -222,7 +223,7 @@ impl Listening {

impl From<Waiting> for Listening {
fn from(_: Waiting) -> Self {
Default::default()
Self { is_synced: false }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl MockBaseNodeService {
updated: None,
latency: None,
online,
base_node_peer: self.state.base_node_peer.clone(),
// base_node_peer: self.state.base_node_peer.clone(),
}
}

Expand All @@ -106,12 +106,12 @@ impl MockBaseNodeService {
updated: None,
latency: None,
online: OnlineStatus::Online,
base_node_peer: None,
// base_node_peer: None,
}
}

fn set_base_node_peer(&mut self, peer: Peer) {
self.state.base_node_peer = Some(peer);
self.base_node_peer = Some(peer);
}

/// This handler is called when requests arrive from the various streams
Expand All @@ -125,7 +125,7 @@ impl MockBaseNodeService {
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.state.base_node_peer.clone();
let peer = self.base_node_peer.clone();
Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new)))
},
BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata(
Expand Down
32 changes: 15 additions & 17 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
loop {
use OnlineStatus::*;
match watcher.recv().await.unwrap_or(Offline) {
Online => match self.wallet_connectivity.get_current_base_node() {
Some(peer) => {
return peer;
},
Online => match self.wallet_connectivity.get_current_base_node_id() {
Some(node_id) => return node_id,
_ => continue,
},
Connecting => {
Expand All @@ -126,15 +124,8 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
let latency = client.get_last_request_latency().await?;
debug!(
target: LOG_TARGET,
"Base node {} latency: {} ms",
peer_node_id,
latency.unwrap_or_default().as_millis()
);

let tip_info = client.get_tip_info().await?;
let is_synced = tip_info.is_synced;

let chain_metadata = tip_info
.metadata
Expand All @@ -143,15 +134,24 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse)
})?;

let is_synced = tip_info.is_synced;
debug!(
target: LOG_TARGET,
"Base node {} Tip: {} ({}) Latency: {} ms",
peer_node_id,
chain_metadata.height_of_longest_chain(),
if is_synced { "Synced" } else { "Syncing..." },
latency.unwrap_or_default().as_millis()
);

self.db.set_chain_metadata(chain_metadata.clone()).await?;

self.map_state(move |state| BaseNodeState {
self.map_state(move |_| BaseNodeState {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency,
online: OnlineStatus::Online,
base_node_peer: state.base_node_peer.clone(),
})
.await;

Expand All @@ -164,25 +164,23 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
}

async fn set_connecting(&self) {
self.map_state(|state| BaseNodeState {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Connecting,
base_node_peer: state.base_node_peer.clone(),
})
.await;
}

async fn set_offline(&self) {
self.map_state(|state| BaseNodeState {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Offline,
base_node_peer: state.base_node_peer.clone(),
})
.await;
}
Expand Down
30 changes: 15 additions & 15 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct BaseNodeState {
pub updated: Option<NaiveDateTime>,
pub latency: Option<Duration>,
pub online: OnlineStatus,
pub base_node_peer: Option<Peer>,
// pub base_node_peer: Option<Peer>,
}

impl Default for BaseNodeState {
Expand All @@ -61,7 +61,7 @@ impl Default for BaseNodeState {
updated: None,
latency: None,
online: OnlineStatus::Connecting,
base_node_peer: None,
// base_node_peer: None,
}
}
}
Expand Down Expand Up @@ -158,18 +158,18 @@ where T: WalletBackend + 'static
}

async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
let new_state = BaseNodeState {
base_node_peer: Some(peer.clone()),
..Default::default()
};

{
let mut lock = self.state.write().await;
*lock = new_state.clone();
}
self.wallet_connectivity.set_base_node(peer.node_id.clone()).await?;

self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
// let new_state = BaseNodeState {
// base_node_peer: Some(peer.clone()),
// ..Default::default()
// };
//
// {
// let mut lock = self.state.write().await;
// *lock = new_state.clone();
// }
self.wallet_connectivity.set_base_node(peer.clone()).await?;

// self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer)));
Ok(())
}
Expand All @@ -189,7 +189,7 @@ where T: WalletBackend + 'static
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.get_state().await.base_node_peer.map(Box::new);
let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new);
Ok(BaseNodeServiceResponse::BaseNodePeer(peer))
},
BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() {
Expand Down
21 changes: 14 additions & 7 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,30 @@ use futures::{
channel::{mpsc, oneshot},
SinkExt,
};
use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease};
use tari_comms::{
peer_manager::{NodeId, Peer},
protocol::rpc::RpcClientLease,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::watch;

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
SetBaseNode(NodeId),
SetBaseNode(Box<Peer>),
}

#[derive(Clone)]
pub struct WalletConnectivityHandle {
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch_rx: watch::Receiver<Option<NodeId>>,
base_node_watch_rx: watch::Receiver<Option<Peer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
}

impl WalletConnectivityHandle {
pub(super) fn new(
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch_rx: watch::Receiver<Option<NodeId>>,
base_node_watch_rx: watch::Receiver<Option<Peer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
) -> Self {
Self {
Expand All @@ -56,9 +59,9 @@ impl WalletConnectivityHandle {
}
}

pub async fn set_base_node(&mut self, base_node_peer: NodeId) -> Result<(), WalletConnectivityError> {
pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.sender
.send(WalletConnectivityRequest::SetBaseNode(base_node_peer))
.send(WalletConnectivityRequest::SetBaseNode(Box::new(base_node_peer)))
.await?;
Ok(())
}
Expand Down Expand Up @@ -110,7 +113,11 @@ impl WalletConnectivityHandle {
self.online_status_rx.clone()
}

pub fn get_current_base_node(&self) -> Option<NodeId> {
pub fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch_rx.borrow().clone()
}

pub fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch_rx.borrow().as_ref().map(|p| p.node_id.clone())
}
}
28 changes: 16 additions & 12 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::{
use log::*;
use tari_comms::{
connectivity::ConnectivityRequester,
peer_manager::NodeId,
peer_manager::{NodeId, Peer},
protocol::rpc::{RpcClientLease, RpcClientPool},
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
Expand All @@ -53,7 +53,7 @@ pub struct WalletConnectivityService {
config: BaseNodeServiceConfig,
request_stream: Fuse<mpsc::Receiver<WalletConnectivityRequest>>,
connectivity: ConnectivityRequester,
base_node_watch: Watch<Option<NodeId>>,
base_node_watch: Watch<Option<Peer>>,
pools: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_requests: Vec<ReplyOneshot>,
Expand All @@ -68,7 +68,7 @@ impl WalletConnectivityService {
pub(super) fn new(
config: BaseNodeServiceConfig,
request_stream: mpsc::Receiver<WalletConnectivityRequest>,
base_node_watch: Watch<Option<NodeId>>,
base_node_watch: Watch<Option<Peer>>,
online_status_watch: Watch<OnlineStatus>,
connectivity: ConnectivityRequester,
) -> Self {
Expand All @@ -91,10 +91,10 @@ impl WalletConnectivityService {
req = self.request_stream.select_next_some() => {
self.handle_request(req).await;
},
peer = base_node_watch_rx.select_next_some() => {
if let Some(peer) = peer {
maybe_peer = base_node_watch_rx.select_next_some() => {
if maybe_peer.is_some() {
// This will block the rest until the connection is established. This is what we want.
self.setup_base_node_connection(peer).await;
self.setup_base_node_connection().await;
}
}
}
Expand All @@ -112,7 +112,7 @@ impl WalletConnectivityService {
},

SetBaseNode(peer) => {
self.set_base_node_peer(peer);
self.set_base_node_peer(*peer);
},
}
}
Expand Down Expand Up @@ -197,25 +197,29 @@ impl WalletConnectivityService {
self.set_base_node_peer(peer);
}

fn set_base_node_peer(&mut self, peer: NodeId) {
fn set_base_node_peer(&mut self, peer: Peer) {
self.pools = None;
self.base_node_watch.broadcast(Some(peer));
}

async fn setup_base_node_connection(&mut self, peer: NodeId) {
async fn setup_base_node_connection(&mut self) {
self.pools = None;
loop {
let node_id = match self.base_node_watch.borrow().as_ref() {
Some(p) => p.node_id.clone(),
None => return,
};
debug!(
target: LOG_TARGET,
"Attempting to connect to base node peer {}...", peer
"Attempting to connect to base node peer {}...", node_id
);
self.set_online_status(OnlineStatus::Connecting);
match self.try_setup_rpc_pool(peer.clone()).await {
match self.try_setup_rpc_pool(node_id.clone()).await {
Ok(_) => {
self.set_online_status(OnlineStatus::Online);
debug!(
target: LOG_TARGET,
"Wallet is ONLINE and connected to base node {}", peer
"Wallet is ONLINE and connected to base node {}", node_id
);
break;
},
Expand Down
Loading

0 comments on commit 0e0fc0e

Please sign in to comment.