Skip to content

Commit

Permalink
fix: improve responsiveness of wallet base node switching (#3488)
Browse files Browse the repository at this point in the history
Description
---

- clear previous base node's state if a new base node is selected
- interrupt sleep in bn monitor early if base node has changed
- interrupt get_tip_info RPC call early if base node has changed
- dont trigger set peer events if same peer is set


Motivation and Context
---
When the user selects a base node, the UI could appear slow when a sleep is in progress in the base node monitor.
During this time, the previous node's latency and height are displayed, which also gives the impression of an unresponsive UI.

How Has This Been Tested?
---
Manually by switching base nodes
  • Loading branch information
sdbondi authored Oct 25, 2021
1 parent a027f32 commit 762cb9a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 57 deletions.
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct BaseNodeServiceConfig {
impl Default for BaseNodeServiceConfig {
fn default() -> Self {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(5),
base_node_monitor_refresh_interval: Duration::from_secs(3),
base_node_rpc_pool_size: 10,
request_max_age: Duration::from_secs(60),
event_channel_size: 250,
Expand Down
13 changes: 8 additions & 5 deletions base_layer/wallet/src/base_node_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,21 @@ where T: WalletBackend + 'static
context.spawn_when_ready(move |handles| async move {
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();

let service = BaseNodeService::new(
let result = BaseNodeService::new(
config,
request_stream,
wallet_connectivity,
event_publisher,
handles.get_shutdown_signal(),
db,
)
.start();
futures::pin_mut!(service);
let _ = service.await;
info!(target: LOG_TARGET, "Wallet Base Node Service shutdown");
.start()
.await;

info!(
target: LOG_TARGET,
"Wallet Base Node Service shutdown with result {:?}", result
);
});

Ok(())
Expand Down
52 changes: 37 additions & 15 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ use crate::{
storage::database::{WalletBackend, WalletDatabase},
};
use chrono::Utc;
use futures::{future, future::Either};
use log::*;
use std::{
convert::TryFrom,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -109,33 +111,45 @@ where
}

async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> {
let mut base_node_watch = self.wallet_connectivity.get_current_base_node_watcher();
loop {
let start = Instant::now();
let timer = Instant::now();
let mut client = self
.wallet_connectivity
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
trace!(
debug!(
target: LOG_TARGET,
"Obtain RPC client {} ms",
start.elapsed().as_millis()
timer.elapsed().as_millis()
);

let base_node_id = match self.wallet_connectivity.get_current_base_node_id() {
Some(n) => n,
None => continue,
};

let start = Instant::now();
let tip_info = client.get_tip_info().await?;
let timer = Instant::now();
let tip_info = match interrupt(base_node_watch.changed(), client.get_tip_info()).await {
Some(tip_info) => tip_info?,
None => {
self.map_state(|_| Default::default()).await;
continue;
},
};
let chain_metadata = tip_info
.metadata
.ok_or_else(|| BaseNodeMonitorError::InvalidBaseNodeResponse("Tip info no metadata".to_string()))
.and_then(|metadata| {
ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse)
})?;
let latency = start.elapsed();
debug!(target: LOG_TARGET, "get_tip_info took {:.2?}", timer.elapsed());

let latency = match client.get_last_request_latency().await? {
Some(latency) => latency,
None => continue,
};

let is_synced = tip_info.is_synced;
debug!(
Expand All @@ -147,25 +161,20 @@ where
latency.as_millis()
);

let start = Instant::now();
self.db.set_chain_metadata(chain_metadata.clone()).await?;
trace!(
target: LOG_TARGET,
"Update metadata in db {} ms",
start.elapsed().as_millis()
);

let start = Instant::now();
self.map_state(move |_| BaseNodeState {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency: Some(latency),
})
.await;
trace!(target: LOG_TARGET, "Publish event {} ms", start.elapsed().as_millis());

time::sleep(self.interval).await
let delay = time::sleep(self.interval.saturating_sub(latency));
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.map_state(|_| Default::default()).await;
}
}

// loop only exits on shutdown/error
Expand Down Expand Up @@ -200,3 +209,16 @@ enum BaseNodeMonitorError {
#[error("Wallet storage error: {0}")]
WalletStorageError(#[from] WalletStorageError),
}

async fn interrupt<F1, F2>(interrupt: F1, fut: F2) -> Option<F2::Output>
where
F1: Future,
F2: Future,
{
tokio::pin!(interrupt);
tokio::pin!(fut);
match future::select(interrupt, fut).await {
Either::Left(_) => None,
Either::Right((v, _)) => Some(v),
}
}
58 changes: 22 additions & 36 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,14 @@ use tokio::sync::RwLock;
const LOG_TARGET: &str = "wallet::base_node_service::service";

/// State determined from Base Node Service Requests
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct BaseNodeState {
pub chain_metadata: Option<ChainMetadata>,
pub is_synced: Option<bool>,
pub updated: Option<NaiveDateTime>,
pub latency: Option<Duration>,
}

impl Default for BaseNodeState {
fn default() -> Self {
Self {
chain_metadata: None,
is_synced: None,
updated: None,
latency: None,
}
}
}

/// The base node service is responsible for handling requests to be sent to the connected base node.
pub struct BaseNodeService<T>
where T: WalletBackend + 'static
Expand All @@ -69,7 +58,7 @@ where T: WalletBackend + 'static
request_stream: Option<Receiver<BaseNodeServiceRequest, Result<BaseNodeServiceResponse, BaseNodeServiceError>>>,
wallet_connectivity: WalletConnectivityHandle,
event_publisher: BaseNodeEventSender,
shutdown_signal: Option<ShutdownSignal>,
shutdown_signal: ShutdownSignal,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
}
Expand All @@ -90,7 +79,7 @@ where T: WalletBackend + 'static
request_stream: Some(request_stream),
wallet_connectivity,
event_publisher,
shutdown_signal: Some(shutdown_signal),
shutdown_signal,
state: Default::default(),
db,
}
Expand All @@ -103,33 +92,13 @@ where T: WalletBackend + 'static

/// Starts the service.
pub async fn start(mut self) -> Result<(), BaseNodeServiceError> {
let shutdown_signal = self
.shutdown_signal
.take()
.expect("Wallet Base Node Service initialized without shutdown signal");

let monitor = BaseNodeMonitor::new(
self.config.base_node_monitor_refresh_interval,
self.state.clone(),
self.db.clone(),
self.wallet_connectivity.clone(),
self.event_publisher.clone(),
);

tokio::spawn({
let shutdown_signal = shutdown_signal.clone();
async move {
let monitor_fut = monitor.run();
futures::pin_mut!(monitor_fut);
future::select(shutdown_signal, monitor_fut).await;
}
});
self.spawn_monitor();

let mut request_stream = self
.request_stream
.take()
.expect("Wallet Base Node Service initialized without request_stream")
.take_until(shutdown_signal);
.take_until(self.shutdown_signal.clone());

info!(target: LOG_TARGET, "Wallet Base Node Service started");
while let Some(request_context) = request_stream.next().await {
Expand All @@ -152,6 +121,23 @@ where T: WalletBackend + 'static
Ok(())
}

fn spawn_monitor(&self) {
let monitor = BaseNodeMonitor::new(
self.config.base_node_monitor_refresh_interval,
self.state.clone(),
self.db.clone(),
self.wallet_connectivity.clone(),
self.event_publisher.clone(),
);

let shutdown_signal = self.shutdown_signal.clone();
tokio::spawn(async move {
let monitor_fut = monitor.run();
futures::pin_mut!(monitor_fut);
future::select(shutdown_signal, monitor_fut).await;
});
}

/// This handler is called when requests arrive from the various streams
async fn handle_request(
&mut self,
Expand Down
5 changes: 5 additions & 0 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl WalletConnectivityHandle {
#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
if let Some(peer) = self.base_node_watch.borrow().as_ref() {
if peer.public_key == base_node_peer.public_key {
return;
}
}
self.base_node_watch.send(Some(base_node_peer));
}

Expand Down

0 comments on commit 762cb9a

Please sign in to comment.