Skip to content

Commit

Permalink
fix: improve p2p RPC robustness (#3208)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Aug 18, 2021
2 parents d9412c2 + de47365 commit 211dcfd
Show file tree
Hide file tree
Showing 22 changed files with 935 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl WalletConnectivityHandle {
/// Obtain a BaseNodeWalletRpcClient.
///
/// This can be relied on to obtain a pooled BaseNodeWalletRpcClient rpc session from a currently selected base
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeWalletRpcClient RPC session.
pub async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
Expand All @@ -89,7 +89,7 @@ impl WalletConnectivityHandle {
/// Obtain a BaseNodeSyncRpcClient.
///
/// This can be relied on to obtain a pooled BaseNodeSyncRpcClient rpc session from a currently selected base
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeSyncRpcClient RPC session.
pub async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
Expand Down
6 changes: 4 additions & 2 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,10 @@ impl WalletConnectivityService {
conn.peer_node_id()
);
self.pools = Some(ClientPoolContainer {
base_node_sync_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size),
base_node_wallet_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size),
base_node_sync_rpc_client: conn
.create_rpc_client_pool(self.config.base_node_rpc_pool_size, Default::default()),
base_node_wallet_rpc_client: conn
.create_rpc_client_pool(self.config.base_node_rpc_pool_size, Default::default()),
});
self.notify_pending_requests().await?;
debug!(
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1816,14 +1816,14 @@ fn test_txo_validation_rpc_timeout() {
.unwrap();

runtime.block_on(async {
let mut delay = delay_for(Duration::from_secs(60)).fuse();
let mut delay = delay_for(Duration::from_secs(100)).fuse();
let mut failed = 0;
loop {
futures::select! {
event = event_stream.select_next_some() => {
if let Ok(msg) = event {
if let OutputManagerEvent::TxoValidationFailure(_,_) = (*msg).clone() {
failed+=1;
failed+=1;
}
}

Expand Down
1 change: 1 addition & 0 deletions comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ anyhow = "1.0.32"

[dev-dependencies]
tari_test_utils = {version="^0.9", path="../infrastructure/test_utils"}
tari_comms_rpc_macros = {version="*", path="./rpc_macros"}

env_logger = "0.7.0"
serde_json = "1.0.39"
Expand Down
2 changes: 1 addition & 1 deletion comms/src/builder/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub enum CommsBuilderError {
ConnectionManagerError(#[from] ConnectionManagerError),
#[error("Node identity not set. Call `with_node_identity(node_identity)` on [CommsBuilder]")]
NodeIdentityNotSet,
#[error("Shutdown signa not set. Call `with_shutdown_signal(shutdown_signal)` on [CommsBuilder]")]
#[error("Shutdown signal not set. Call `with_shutdown_signal(shutdown_signal)` on [CommsBuilder]")]
ShutdownSignalNotSet,
#[error("The PeerStorage was not provided to the CommsBuilder. Use `with_peer_storage` to set it.")]
PeerStorageNotProvided,
Expand Down
12 changes: 9 additions & 3 deletions comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,15 @@ impl PeerConnection {
/// Creates a new RpcClientPool that can be shared between tasks. The client pool will lazily establish up to
/// `max_sessions` sessions and provides client session that is least used.
#[cfg(feature = "rpc")]
pub fn create_rpc_client_pool<T>(&self, max_sessions: usize) -> RpcClientPool<T>
where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone {
RpcClientPool::new(self.clone(), max_sessions)
pub fn create_rpc_client_pool<T>(
&self,
max_sessions: usize,
client_config: RpcClientBuilder<T>,
) -> RpcClientPool<T>
where
T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone,
{
RpcClientPool::new(self.clone(), max_sessions, client_config)
}

/// Immediately disconnects the peer connection. This can only fail if the peer connection worker
Expand Down
4 changes: 2 additions & 2 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ConnectivityManagerActor {
_ => {
debug!(
target: LOG_TARGET,
"No existing connection found for peer `{}`. Dialling...",
"No existing connection found for peer `{}`. Dialing...",
node_id.short_str()
);
if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply).await {
Expand Down Expand Up @@ -528,7 +528,7 @@ impl ConnectivityManagerActor {
num_failed
);
if self.peer_manager.set_offline(node_id, true).await? {
warn!(
debug!(
target: LOG_TARGET,
"Peer `{}` was marked as offline but was already offline.", node_id
);
Expand Down
54 changes: 40 additions & 14 deletions comms/src/protocol/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use futures::{
use log::*;
use prost::Message;
use std::{
convert::TryFrom,
fmt,
future::Future,
marker::PhantomData,
Expand All @@ -75,7 +76,7 @@ impl RpcClient {
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let (request_tx, request_rx) = mpsc::channel(1);
let connector = ClientConnector { inner: request_tx };
let connector = ClientConnector::new(request_tx);
let (ready_tx, ready_rx) = oneshot::channel();
task::spawn(RpcClientWorker::new(config, request_rx, framed, ready_tx).run());
ready_rx
Expand Down Expand Up @@ -227,7 +228,7 @@ impl Default for RpcClientConfig {
fn default() -> Self {
Self {
deadline: Some(Duration::from_secs(30)),
deadline_grace_period: Duration::from_secs(10),
deadline_grace_period: Duration::from_secs(30),
handshake_timeout: Duration::from_secs(30),
}
}
Expand All @@ -239,6 +240,10 @@ pub struct ClientConnector {
}

impl ClientConnector {
pub(self) fn new(sender: mpsc::Sender<ClientRequest>) -> Self {
Self { inner: sender }
}

pub fn close(&mut self) {
self.inner.close_channel();
}
Expand Down Expand Up @@ -293,8 +298,8 @@ pub struct RpcClientWorker<TSubstream> {
request_rx: mpsc::Receiver<ClientRequest>,
framed: CanonicalFraming<TSubstream>,
// Request ids are limited to u16::MAX because varint encoding is used over the wire and the magnitude of the value
// sent determines the byte size. A u16 will be more than enough for the purpose (currently just logging)
request_id: u16,
// sent determines the byte size. A u16 will be more than enough for the purpose
next_request_id: u16,
ready_tx: Option<oneshot::Sender<Result<(), RpcError>>>,
latency: Option<Duration>,
}
Expand All @@ -312,7 +317,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
config,
request_rx,
framed,
request_id: 0,
next_request_id: 0,
ready_tx: Some(ready_tx),
latency: None,
}
Expand Down Expand Up @@ -348,7 +353,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
match req {
SendRequest { request, reply } => {
if let Err(err) = self.do_request_response(request, reply).await {
debug!(target: LOG_TARGET, "Unexpected error: {}. Worker is terminating.", err);
error!(target: LOG_TARGET, "Unexpected error: {}. Worker is terminating.", err);
break;
}
},
Expand Down Expand Up @@ -433,8 +438,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
},
};

match Self::convert_to_result(resp) {
Ok(resp) => {
match Self::convert_to_result(resp, request_id) {
Ok(Ok(resp)) => {
// The consumer may drop the receiver before all responses are received.
// We just ignore that as we still want obey the protocol and receive messages until the FIN flag or
// the connection is dropped
Expand All @@ -447,39 +452,60 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
break;
}
},
Err(err) => {
Ok(Err(err)) => {
debug!(target: LOG_TARGET, "Remote service returned error: {}", err);
if !response_tx.is_closed() {
let _ = response_tx.send(Err(err)).await;
}
response_tx.close_channel();
break;
},
Err(err @ RpcError::ResponseIdDidNotMatchRequest { .. }) => {
warn!(target: LOG_TARGET, "{}", err);
// Ignore the response, this can happen when there is excessive latency. The server sends back a
// reply before the deadline but it is only received after the client has timed
// out
continue;
},
Err(err) => return Err(err),
}
}

Ok(())
}

fn next_request_id(&mut self) -> u16 {
let next_id = self.request_id;
let next_id = self.next_request_id;
// request_id is allowed to wrap around back to 0
self.request_id = self.request_id.checked_add(1).unwrap_or(0);
self.next_request_id = self.next_request_id.checked_add(1).unwrap_or(0);
next_id
}

fn convert_to_result(resp: proto::rpc::RpcResponse) -> Result<Response<Bytes>, RpcStatus> {
fn convert_to_result(
resp: proto::rpc::RpcResponse,
request_id: u16,
) -> Result<Result<Response<Bytes>, RpcStatus>, RpcError> {
let resp_id = u16::try_from(resp.request_id)
.map_err(|_| RpcStatus::protocol_error(format!("invalid request_id: must be less than {}", u16::MAX)))?;

if resp_id != request_id {
return Err(RpcError::ResponseIdDidNotMatchRequest {
expected: request_id,
actual: resp.request_id as u16,
});
}

let status = RpcStatus::from(&resp);
if !status.is_ok() {
return Err(status);
return Ok(Err(status));
}

let resp = Response {
flags: resp.flags(),
message: resp.message.into(),
};

Ok(resp)
Ok(Ok(resp))
}
}

Expand Down
22 changes: 17 additions & 5 deletions comms/src/protocol/rpc/client_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@

use crate::{
peer_manager::NodeId,
protocol::rpc::{error::HandshakeRejectReason, NamedProtocolService, RpcClient, RpcError, RpcHandshakeError},
protocol::rpc::{
error::HandshakeRejectReason,
NamedProtocolService,
RpcClient,
RpcClientBuilder,
RpcError,
RpcHandshakeError,
},
PeerConnection,
};
use log::*;
Expand All @@ -43,8 +50,8 @@ impl<T> RpcClientPool<T>
where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone
{
/// Create a new RpcClientPool. Panics if passed a pool_size of 0.
pub(crate) fn new(peer_connection: PeerConnection, pool_size: usize) -> Self {
let pool = LazyPool::new(peer_connection, pool_size);
pub(crate) fn new(peer_connection: PeerConnection, pool_size: usize, client_config: RpcClientBuilder<T>) -> Self {
let pool = LazyPool::new(peer_connection, pool_size, client_config);
Self {
pool: Arc::new(Mutex::new(pool)),
}
Expand All @@ -60,16 +67,18 @@ where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone
pub(super) struct LazyPool<T> {
connection: PeerConnection,
clients: Vec<RpcClientLease<T>>,
client_config: RpcClientBuilder<T>,
}

impl<T> LazyPool<T>
where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone
{
pub fn new(connection: PeerConnection, capacity: usize) -> Self {
pub fn new(connection: PeerConnection, capacity: usize, client_config: RpcClientBuilder<T>) -> Self {
assert!(capacity > 0, "Pool capacity of 0 is invalid");
Self {
connection,
clients: Vec::with_capacity(capacity),
client_config,
}
}

Expand Down Expand Up @@ -162,7 +171,10 @@ where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone

async fn add_new_client_session(&mut self) -> Result<&RpcClientLease<T>, RpcClientPoolError> {
debug_assert!(!self.is_full(), "add_new_client called when pool is full");
let client = self.connection.connect_rpc().await?;
let client = self
.connection
.connect_rpc_using_builder(self.client_config.clone())
.await?;
let client = RpcClientLease::new(client);
self.clients.push(client);
Ok(self.clients.last().unwrap())
Expand Down
13 changes: 11 additions & 2 deletions comms/src/protocol/rpc/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,28 @@ impl RpcCommsProvider for RpcCommsBackend {
}

pub struct RequestContext {
request_id: u32,
backend: Box<dyn RpcCommsProvider>,
node_id: NodeId,
}

impl RequestContext {
pub(super) fn new(node_id: NodeId, backend: Box<dyn RpcCommsProvider>) -> Self {
Self { backend, node_id }
pub(super) fn new(request_id: u32, node_id: NodeId, backend: Box<dyn RpcCommsProvider>) -> Self {
Self {
request_id,
backend,
node_id,
}
}

pub fn peer_node_id(&self) -> &NodeId {
&self.node_id
}

pub fn request_id(&self) -> u32 {
self.request_id
}

pub(crate) async fn fetch_peer(&self) -> Result<Peer, RpcError> {
self.backend.fetch_peer(&self.node_id).await
}
Expand Down
2 changes: 2 additions & 0 deletions comms/src/protocol/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub enum RpcError {
ServerClosedRequest,
#[error("Request cancelled")]
RequestCancelled,
#[error("Response did not match the request ID (expected {expected} actual {actual})")]
ResponseIdDidNotMatchRequest { expected: u16, actual: u16 },
#[error("Client internal error: {0}")]
ClientInternalError(String),
#[error("Handshake error: {0}")]
Expand Down
4 changes: 4 additions & 0 deletions comms/src/protocol/rpc/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl proto::rpc::RpcResponse {
pub fn flags(&self) -> RpcMessageFlags {
RpcMessageFlags::from_bits_truncate(self.flags as u8)
}

pub fn is_fin(&self) -> bool {
self.flags as u8 & RpcMessageFlags::FIN.bits() != 0
}
}

impl fmt::Display for proto::rpc::RpcResponse {
Expand Down
2 changes: 1 addition & 1 deletion comms/src/protocol/rpc/server/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl RpcRequestMock {
}

pub fn request_with_context<T>(&self, node_id: NodeId, msg: T) -> Request<T> {
let context = RequestContext::new(node_id, Box::new(self.comms_provider.clone()));
let context = RequestContext::new(0, node_id, Box::new(self.comms_provider.clone()));
Request::with_context(context, 0.into(), msg)
}

Expand Down
Loading

0 comments on commit 211dcfd

Please sign in to comment.