Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bug in wallet base node peer switching #3217

Merged
merged 3 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion applications/daily_tests/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const yargs = () => require("yargs")(hideBin(process.argv));
function sendWebhookNotification(channel, message, webhookUrlOverride = null) {
const hook = webhookUrlOverride || getWebhookUrlFromEnv();
if (!hook) {
throw new Error("WEBHOOK_URL not specified");
return;
}
const data = JSON.stringify({ channel, text: message });
const args = ` -i -X POST -H 'Content-Type: application/json' -d '${data}' ${hook}`;
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
async fn get_tip_info(&self, _request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus> {
let state_machine = self.state_machine();
let status_watch = state_machine.get_status_info_watch();
let is_synced = match (*status_watch.borrow()).state_info {
let is_synced = match status_watch.borrow().state_info {
StateInfo::Listening(li) => li.is_synced(),
_ => false,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ impl Listening {
);

if !self.is_synced {
debug!(target: LOG_TARGET, "Initial sync achieved");
self.is_synced = true;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced)));
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true)));
}
continue;
}
Expand Down Expand Up @@ -222,7 +223,7 @@ impl Listening {

impl From<Waiting> for Listening {
fn from(_: Waiting) -> Self {
Default::default()
Self { is_synced: false }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ impl MockBaseNodeService {
updated: None,
latency: None,
online,
base_node_peer: self.state.base_node_peer.clone(),
}
}

Expand All @@ -106,12 +105,11 @@ impl MockBaseNodeService {
updated: None,
latency: None,
online: OnlineStatus::Online,
base_node_peer: None,
}
}

fn set_base_node_peer(&mut self, peer: Peer) {
self.state.base_node_peer = Some(peer);
self.base_node_peer = Some(peer);
}

/// This handler is called when requests arrive from the various streams
Expand All @@ -125,7 +123,7 @@ impl MockBaseNodeService {
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.state.base_node_peer.clone();
let peer = self.base_node_peer.clone();
Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new)))
},
BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata(
Expand Down
32 changes: 15 additions & 17 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
loop {
use OnlineStatus::*;
match watcher.recv().await.unwrap_or(Offline) {
Online => match self.wallet_connectivity.get_current_base_node() {
Some(peer) => {
return peer;
},
Online => match self.wallet_connectivity.get_current_base_node_id() {
Some(node_id) => return node_id,
_ => continue,
},
Connecting => {
Expand All @@ -126,15 +124,8 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
let latency = client.get_last_request_latency().await?;
debug!(
target: LOG_TARGET,
"Base node {} latency: {} ms",
peer_node_id,
latency.unwrap_or_default().as_millis()
);

let tip_info = client.get_tip_info().await?;
let is_synced = tip_info.is_synced;

let chain_metadata = tip_info
.metadata
Expand All @@ -143,15 +134,24 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse)
})?;

let is_synced = tip_info.is_synced;
debug!(
target: LOG_TARGET,
"Base node {} Tip: {} ({}) Latency: {} ms",
peer_node_id,
chain_metadata.height_of_longest_chain(),
if is_synced { "Synced" } else { "Syncing..." },
latency.unwrap_or_default().as_millis()
);

self.db.set_chain_metadata(chain_metadata.clone()).await?;

self.map_state(move |state| BaseNodeState {
self.map_state(move |_| BaseNodeState {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency,
online: OnlineStatus::Online,
base_node_peer: state.base_node_peer.clone(),
})
.await;

Expand All @@ -164,25 +164,23 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
}

async fn set_connecting(&self) {
self.map_state(|state| BaseNodeState {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Connecting,
base_node_peer: state.base_node_peer.clone(),
})
.await;
}

async fn set_offline(&self) {
self.map_state(|state| BaseNodeState {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Offline,
base_node_peer: state.base_node_peer.clone(),
})
.await;
}
Expand Down
18 changes: 3 additions & 15 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct BaseNodeState {
pub updated: Option<NaiveDateTime>,
pub latency: Option<Duration>,
pub online: OnlineStatus,
pub base_node_peer: Option<Peer>,
// pub base_node_peer: Option<Peer>,
}

impl Default for BaseNodeState {
Expand All @@ -61,7 +61,6 @@ impl Default for BaseNodeState {
updated: None,
latency: None,
online: OnlineStatus::Connecting,
base_node_peer: None,
}
}
}
Expand Down Expand Up @@ -158,18 +157,7 @@ where T: WalletBackend + 'static
}

async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
let new_state = BaseNodeState {
base_node_peer: Some(peer.clone()),
..Default::default()
};

{
let mut lock = self.state.write().await;
*lock = new_state.clone();
}
self.wallet_connectivity.set_base_node(peer.node_id.clone()).await?;

self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
self.wallet_connectivity.set_base_node(peer.clone()).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should happen in the wallet container set_base_node_peer function where all the other wallet services have their set_base_node method called.

Copy link
Member Author

@sdbondi sdbondi Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree, will require a bit more of a refactor (BaseNodeEvent::BaseNodePeerSet event needs to be moved around / base node service needs to monitor the change and emit the event), wanted to get this in quicker.

self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer)));
Ok(())
}
Expand All @@ -189,7 +177,7 @@ where T: WalletBackend + 'static
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.get_state().await.base_node_peer.map(Box::new);
let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new);
Ok(BaseNodeServiceResponse::BaseNodePeer(peer))
},
BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() {
Expand Down
21 changes: 14 additions & 7 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,30 @@ use futures::{
channel::{mpsc, oneshot},
SinkExt,
};
use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease};
use tari_comms::{
peer_manager::{NodeId, Peer},
protocol::rpc::RpcClientLease,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::watch;

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
SetBaseNode(NodeId),
SetBaseNode(Box<Peer>),
}

#[derive(Clone)]
pub struct WalletConnectivityHandle {
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch_rx: watch::Receiver<Option<NodeId>>,
base_node_watch_rx: watch::Receiver<Option<Peer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
}

impl WalletConnectivityHandle {
pub(super) fn new(
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch_rx: watch::Receiver<Option<NodeId>>,
base_node_watch_rx: watch::Receiver<Option<Peer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
) -> Self {
Self {
Expand All @@ -56,9 +59,9 @@ impl WalletConnectivityHandle {
}
}

pub async fn set_base_node(&mut self, base_node_peer: NodeId) -> Result<(), WalletConnectivityError> {
pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.sender
.send(WalletConnectivityRequest::SetBaseNode(base_node_peer))
.send(WalletConnectivityRequest::SetBaseNode(Box::new(base_node_peer)))
.await?;
Ok(())
}
Expand Down Expand Up @@ -110,7 +113,11 @@ impl WalletConnectivityHandle {
self.online_status_rx.clone()
}

pub fn get_current_base_node(&self) -> Option<NodeId> {
pub fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch_rx.borrow().clone()
}

pub fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch_rx.borrow().as_ref().map(|p| p.node_id.clone())
}
}
28 changes: 16 additions & 12 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::{
use log::*;
use tari_comms::{
connectivity::ConnectivityRequester,
peer_manager::NodeId,
peer_manager::{NodeId, Peer},
protocol::rpc::{RpcClientLease, RpcClientPool},
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
Expand All @@ -53,7 +53,7 @@ pub struct WalletConnectivityService {
config: BaseNodeServiceConfig,
request_stream: Fuse<mpsc::Receiver<WalletConnectivityRequest>>,
connectivity: ConnectivityRequester,
base_node_watch: Watch<Option<NodeId>>,
base_node_watch: Watch<Option<Peer>>,
pools: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_requests: Vec<ReplyOneshot>,
Expand All @@ -68,7 +68,7 @@ impl WalletConnectivityService {
pub(super) fn new(
config: BaseNodeServiceConfig,
request_stream: mpsc::Receiver<WalletConnectivityRequest>,
base_node_watch: Watch<Option<NodeId>>,
base_node_watch: Watch<Option<Peer>>,
online_status_watch: Watch<OnlineStatus>,
connectivity: ConnectivityRequester,
) -> Self {
Expand All @@ -91,10 +91,10 @@ impl WalletConnectivityService {
req = self.request_stream.select_next_some() => {
self.handle_request(req).await;
},
peer = base_node_watch_rx.select_next_some() => {
if let Some(peer) = peer {
maybe_peer = base_node_watch_rx.select_next_some() => {
if maybe_peer.is_some() {
// This will block the rest until the connection is established. This is what we want.
self.setup_base_node_connection(peer).await;
self.setup_base_node_connection().await;
}
}
}
Expand All @@ -112,7 +112,7 @@ impl WalletConnectivityService {
},

SetBaseNode(peer) => {
self.set_base_node_peer(peer);
self.set_base_node_peer(*peer);
},
}
}
Expand Down Expand Up @@ -197,25 +197,29 @@ impl WalletConnectivityService {
self.set_base_node_peer(peer);
}

fn set_base_node_peer(&mut self, peer: NodeId) {
fn set_base_node_peer(&mut self, peer: Peer) {
self.pools = None;
self.base_node_watch.broadcast(Some(peer));
}

async fn setup_base_node_connection(&mut self, peer: NodeId) {
async fn setup_base_node_connection(&mut self) {
self.pools = None;
loop {
let node_id = match self.base_node_watch.borrow().as_ref() {
Some(p) => p.node_id.clone(),
None => return,
};
debug!(
target: LOG_TARGET,
"Attempting to connect to base node peer {}...", peer
"Attempting to connect to base node peer {}...", node_id
);
self.set_online_status(OnlineStatus::Connecting);
match self.try_setup_rpc_pool(peer.clone()).await {
match self.try_setup_rpc_pool(node_id.clone()).await {
Ok(_) => {
self.set_online_status(OnlineStatus::Online);
debug!(
target: LOG_TARGET,
"Wallet is ONLINE and connected to base node {}", peer
"Wallet is ONLINE and connected to base node {}", node_id
);
break;
},
Expand Down
Loading