From d7e7be0f694105d18ecfc3f170f4bf031be7beb6 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Mon, 16 Aug 2021 13:48:07 +0400 Subject: [PATCH] feat: add sync rpc client pool to wallet connectivity - add sync pool and `obtain_base_node_sync_rpc_client` - add `get_header` to base node rpc --- Cargo.lock | 2 +- base_layer/core/src/base_node/rpc/mod.rs | 31 +++-- base_layer/core/src/base_node/rpc/service.rs | 13 ++ .../wallet/src/base_node_service/monitor.rs | 4 +- .../wallet/src/connectivity_service/handle.rs | 23 +++- .../src/connectivity_service/service.rs | 111 +++++++++++++++--- .../wallet/src/connectivity_service/test.rs | 12 +- .../src/output_manager_service/handle.rs | 2 +- base_layer/wallet/tests/support/rpc.rs | 13 ++ 9 files changed, 168 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae43f086a7..f64cb66e06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5000,7 +5000,7 @@ dependencies = [ [[package]] name = "tari_wallet_ffi" -version = "0.17.2" +version = "0.17.3" dependencies = [ "chrono", "env_logger 0.7.1", diff --git a/base_layer/core/src/base_node/rpc/mod.rs b/base_layer/core/src/base_node/rpc/mod.rs index cf41468610..44cdefdbc3 100644 --- a/base_layer/core/src/base_node/rpc/mod.rs +++ b/base_layer/core/src/base_node/rpc/mod.rs @@ -24,18 +24,6 @@ mod service; #[cfg(feature = "base_node")] use crate::base_node::StateMachineHandle; -use crate::proto::{ - base_node::{ - FetchMatchingUtxos, - FetchUtxosResponse, - Signatures, - TipInfoResponse, - TxQueryBatchResponses, - TxQueryResponse, - TxSubmissionResponse, - }, - types::{Signature, Transaction}, -}; #[cfg(feature = "base_node")] use crate::{ chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, @@ -44,6 +32,22 @@ use crate::{ #[cfg(feature = "base_node")] pub use service::BaseNodeWalletRpcService; +use crate::{ + proto, + proto::{ + base_node::{ + FetchMatchingUtxos, + FetchUtxosResponse, + Signatures, + TipInfoResponse, + TxQueryBatchResponses, + TxQueryResponse, + TxSubmissionResponse, + }, + types::{Signature, Transaction}, + }, +}; + use tari_comms::protocol::rpc::{Request, Response, RpcStatus}; use tari_comms_rpc_macros::tari_rpc; @@ -72,6 +76,9 @@ pub trait BaseNodeWalletService: Send + Sync + 'static { #[rpc(method = 5)] async fn get_tip_info(&self, request: Request<()>) -> Result, RpcStatus>; + + #[rpc(method = 6)] + async fn get_header(&self, request: Request) -> Result, RpcStatus>; } #[cfg(feature = "base_node")] diff --git a/base_layer/core/src/base_node/rpc/service.rs b/base_layer/core/src/base_node/rpc/service.rs index 8f37830694..d82be66a8f 100644 --- a/base_layer/core/src/base_node/rpc/service.rs +++ b/base_layer/core/src/base_node/rpc/service.rs @@ -24,6 +24,7 @@ use crate::{ base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle}, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput}, mempool::{service::MempoolHandle, TxStorageResponse}, + proto, proto::{ base_node::{ FetchMatchingUtxos, @@ -327,4 +328,16 @@ impl BaseNodeWalletService for BaseNodeWalletRpc is_synced, })) } + + async fn get_header(&self, request: Request) -> Result, RpcStatus> { + let height = request.into_message(); + let header = self + .db() + .fetch_header(height) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found(format!("Header not found at height {}", height)))?; + + Ok(Response::new(header.into())) + } } diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 7becc0493a..9adbd08353 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -122,11 +122,11 @@ impl BaseNodeMonitor { let peer_node_id = self.update_connectivity_status().await; let mut client = self .wallet_connectivity - .obtain_base_node_rpc_client() + .obtain_base_node_wallet_rpc_client() .await .ok_or(BaseNodeMonitorError::NodeShuttingDown)?; let latency = client.get_last_request_latency().await?; - trace!( + debug!( target: LOG_TARGET, "Base node {} latency: {} ms", peer_node_id, diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index 0f295d2434..41c9d51c5a 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -27,11 +27,12 @@ use futures::{ SinkExt, }; use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease}; -use tari_core::base_node::rpc; +use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; use tokio::sync::watch; pub enum WalletConnectivityRequest { - ObtainBaseNodeWalletRpcClient(oneshot::Sender>), + ObtainBaseNodeWalletRpcClient(oneshot::Sender>), + ObtainBaseNodeSyncRpcClient(oneshot::Sender>), SetBaseNode(NodeId), } @@ -68,7 +69,7 @@ impl WalletConnectivityHandle { /// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is /// shutting down, where it will return None. Use this function whenever no work can be done without a /// BaseNodeWalletRpcClient RPC session. - pub async fn obtain_base_node_rpc_client(&mut self) -> Option> { + pub async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option> { let (reply_tx, reply_rx) = oneshot::channel(); // Under what conditions do the (1) mpsc channel and (2) oneshot channel error? // (1) when the receiver has been dropped @@ -85,6 +86,22 @@ impl WalletConnectivityHandle { reply_rx.await.ok() } + /// Obtain a BaseNodeSyncRpcClient. + /// + /// This can be relied on to obtain a pooled BaseNodeSyncRpcClient rpc session from a currently selected base + /// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is + /// shutting down, where it will return None. Use this function whenever no work can be done without a + /// BaseNodeSyncRpcClient RPC session. + pub async fn obtain_base_node_sync_rpc_client(&mut self) -> Option> { + let (reply_tx, reply_rx) = oneshot::channel(); + self.sender + .send(WalletConnectivityRequest::ObtainBaseNodeSyncRpcClient(reply_tx)) + .await + .ok()?; + + reply_rx.await.ok() + } + pub async fn get_connectivity_status(&mut self) -> OnlineStatus { self.online_status_rx.recv().await.unwrap_or(OnlineStatus::Offline) } diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index 38e3582c59..5f8a7c469b 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -36,7 +36,7 @@ use tari_comms::{ peer_manager::NodeId, protocol::rpc::{RpcClientLease, RpcClientPool}, }; -use tari_core::base_node::rpc; +use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; use tokio::time; const LOG_TARGET: &str = "wallet::connectivity"; @@ -54,9 +54,14 @@ pub struct WalletConnectivityService { request_stream: Fuse>, connectivity: ConnectivityRequester, base_node_watch: Watch>, - base_node_wallet_rpc_client_pool: Option>, + pools: Option, online_status_watch: Watch, - pending_base_node_rpc_requests: Vec>>, + pending_requests: Vec, +} + +struct ClientPoolContainer { + pub base_node_wallet_rpc_client: RpcClientPool, + pub base_node_sync_rpc_client: RpcClientPool, } impl WalletConnectivityService { @@ -72,8 +77,8 @@ impl WalletConnectivityService { request_stream: request_stream.fuse(), connectivity, base_node_watch, - base_node_wallet_rpc_client_pool: None, - pending_base_node_rpc_requests: Vec::new(), + pools: None, + pending_requests: Vec::new(), online_status_watch, } } @@ -100,7 +105,10 @@ impl WalletConnectivityService { use WalletConnectivityRequest::*; match request { ObtainBaseNodeWalletRpcClient(reply) => { - self.handle_get_base_node_wallet_rpc_client(reply).await; + self.handle_pool_request(reply.into()).await; + }, + ObtainBaseNodeSyncRpcClient(reply) => { + self.handle_pool_request(reply.into()).await; }, SetBaseNode(peer) => { @@ -109,12 +117,20 @@ impl WalletConnectivityService { } } + async fn handle_pool_request(&mut self, reply: ReplyOneshot) { + use ReplyOneshot::*; + match reply { + WalletRpc(tx) => self.handle_get_base_node_wallet_rpc_client(tx).await, + SyncRpc(tx) => self.handle_get_base_node_sync_rpc_client(tx).await, + } + } + async fn handle_get_base_node_wallet_rpc_client( &mut self, - reply: oneshot::Sender>, + reply: oneshot::Sender>, ) { - match self.base_node_wallet_rpc_client_pool { - Some(ref pool) => match pool.get().await { + match self.pools { + Some(ref pools) => match pools.base_node_wallet_rpc_client.get().await { Ok(client) => { let _ = reply.send(client); }, @@ -124,16 +140,47 @@ impl WalletConnectivityService { "Base node connection failed: {}. Reconnecting...", e ); self.trigger_reconnect(); - self.pending_base_node_rpc_requests.push(reply); + self.pending_requests.push(reply.into()); }, }, None => { - self.pending_base_node_rpc_requests.push(reply); + self.pending_requests.push(reply.into()); if self.base_node_watch.borrow().is_none() { warn!( target: LOG_TARGET, "{} requests are waiting for base node to be set", - self.pending_base_node_rpc_requests.len() + self.pending_requests.len() + ); + } + }, + } + } + + async fn handle_get_base_node_sync_rpc_client( + &mut self, + reply: oneshot::Sender>, + ) { + match self.pools { + Some(ref pools) => match pools.base_node_sync_rpc_client.get().await { + Ok(client) => { + let _ = reply.send(client); + }, + Err(e) => { + warn!( + target: LOG_TARGET, + "Base node connection failed: {}. Reconnecting...", e + ); + self.trigger_reconnect(); + self.pending_requests.push(reply.into()); + }, + }, + None => { + self.pending_requests.push(reply.into()); + if self.base_node_watch.borrow().is_none() { + warn!( + target: LOG_TARGET, + "{} requests are waiting for base node to be set", + self.pending_requests.len() ); } }, @@ -151,12 +198,12 @@ impl WalletConnectivityService { } fn set_base_node_peer(&mut self, peer: NodeId) { - self.base_node_wallet_rpc_client_pool = None; + self.pools = None; self.base_node_watch.broadcast(Some(peer)); } async fn setup_base_node_connection(&mut self, peer: NodeId) { - self.base_node_wallet_rpc_client_pool = None; + self.pools = None; loop { debug!( target: LOG_TARGET, @@ -194,8 +241,10 @@ impl WalletConnectivityService { "Successfully established peer connection to base node {}", conn.peer_node_id() ); - let pool = conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size); - self.base_node_wallet_rpc_client_pool = Some(pool); + self.pools = Some(ClientPoolContainer { + base_node_sync_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size), + base_node_wallet_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size), + }); self.notify_pending_requests().await?; debug!( target: LOG_TARGET, @@ -206,14 +255,40 @@ impl WalletConnectivityService { } async fn notify_pending_requests(&mut self) -> Result<(), WalletConnectivityError> { - let current_pending = mem::take(&mut self.pending_base_node_rpc_requests); + let current_pending = mem::take(&mut self.pending_requests); for reply in current_pending { if reply.is_canceled() { continue; } - self.handle_get_base_node_wallet_rpc_client(reply).await; + self.handle_pool_request(reply).await; } Ok(()) } } + +enum ReplyOneshot { + WalletRpc(oneshot::Sender>), + SyncRpc(oneshot::Sender>), +} + +impl ReplyOneshot { + pub fn is_canceled(&self) -> bool { + use ReplyOneshot::*; + match self { + WalletRpc(tx) => tx.is_canceled(), + SyncRpc(tx) => tx.is_canceled(), + } + } +} + +impl From>> for ReplyOneshot { + fn from(tx: oneshot::Sender>) -> Self { + ReplyOneshot::WalletRpc(tx) + } +} +impl From>> for ReplyOneshot { + fn from(tx: oneshot::Sender>) -> Self { + ReplyOneshot::SyncRpc(tx) + } +} diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index c20d57b815..35ca5ee95d 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -87,7 +87,7 @@ async fn it_dials_peer_when_base_node_is_set() { // Now a connection will given to the service mock_state.add_active_connection(conn).await; - let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap(); + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); assert!(rpc_client.is_connected()); } @@ -106,7 +106,7 @@ async fn it_resolves_many_pending_rpc_session_requests() { let pending_requests = iter::repeat_with(|| { let mut handle = handle.clone(); task::spawn(async move { - let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap(); + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); rpc_client.is_connected() }) }) @@ -140,7 +140,7 @@ async fn it_changes_to_a_new_base_node() { assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1); let _ = mock_state.take_calls().await; - let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap(); + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); assert!(rpc_client.is_connected()); // Initiate a connection to the base node @@ -150,7 +150,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.expect_dial_peer(base_node_peer2.node_id()).await; assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1); - let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap(); + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); assert!(rpc_client.is_connected()); } @@ -178,7 +178,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { let barrier = barrier.clone(); async move { barrier.wait().await; - let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap(); + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); assert!(rpc_client.is_connected()); } }); @@ -215,7 +215,7 @@ async fn it_gracefully_handles_multiple_connection_failures() { let barrier = barrier.clone(); async move { barrier.wait().await; - let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap(); + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); assert!(rpc_client.is_connected()); } }); diff --git a/base_layer/wallet/src/output_manager_service/handle.rs b/base_layer/wallet/src/output_manager_service/handle.rs index 8b4ae008f7..a1bd883802 100644 --- a/base_layer/wallet/src/output_manager_service/handle.rs +++ b/base_layer/wallet/src/output_manager_service/handle.rs @@ -113,7 +113,7 @@ impl fmt::Display for OutputManagerRequest { GetPublicRewindKeys => write!(f, "GetPublicRewindKeys"), FeeEstimate(_) => write!(f, "FeeEstimate"), ScanForRecoverableOutputs(_) => write!(f, "ScanForRecoverableOutputs"), - ScanOutputs(_) => write!(f, "ScanRewindAndImportOutputs"), + ScanOutputs(_) => write!(f, "ScanOutputs"), AddKnownOneSidedPaymentScript(_) => write!(f, "AddKnownOneSidedPaymentScript"), } } diff --git a/base_layer/wallet/tests/support/rpc.rs b/base_layer/wallet/tests/support/rpc.rs index bba8b32858..0f7009bdd0 100644 --- a/base_layer/wallet/tests/support/rpc.rs +++ b/base_layer/wallet/tests/support/rpc.rs @@ -31,6 +31,8 @@ use tari_core::{ proto::wallet_rpc::{TxLocation, TxQueryResponse, TxSubmissionRejectionReason, TxSubmissionResponse}, rpc::BaseNodeWalletService, }, + blocks::BlockHeader, + proto, proto::{ base_node::{ ChainMetadata, @@ -86,6 +88,7 @@ pub struct BaseNodeWalletRpcMockState { fetch_utxos_calls: Arc>>>>, response_delay: Arc>>, rpc_status_error: Arc>>, + get_header_response: Arc>>, synced: Arc>, utxos: Arc>>, } @@ -121,6 +124,7 @@ impl BaseNodeWalletRpcMockState { fetch_utxos_calls: Arc::new(Mutex::new(Vec::new())), response_delay: Arc::new(Mutex::new(None)), rpc_status_error: Arc::new(Mutex::new(None)), + get_header_response: Arc::new(Mutex::new(None)), synced: Arc::new(Mutex::new(true)), utxos: Arc::new(Mutex::new(Vec::new())), } @@ -458,6 +462,15 @@ impl BaseNodeWalletService for BaseNodeWalletRpcMockService { Ok(Response::new(tip_info_response_lock.clone())) } + + async fn get_header(&self, _: Request) -> Result, RpcStatus> { + let lock = acquire_lock!(self.state.get_header_response); + let resp = lock + .as_ref() + .cloned() + .ok_or_else(|| RpcStatus::not_found("get_header_response set to None"))?; + Ok(Response::new(resp.into())) + } } #[cfg(test)]