Skip to content

Commit

Permalink
feat: trigger mempool sync on lag (#4730)
Browse files Browse the repository at this point in the history
Description
---
Adds in the ability to retrigger the mempool sync if a larger re_org or block sync occurred. 

Motivation and Context
---
As of #4706 the mempool will now properly handle re-orgs and block syncs. 
But when the base_node is not at the "correct" tip its mempool will reject all invalid transactions. In order to get those transactions back, we need to trigger a mempool sync. 
This adds in a new config setting to control the maximum number of blocks the base_node added during a reorg or sync before it starts a sync process. 
The assumption is that in most cases, the blocks removed will be less or the same as the blocks added, so we only need to look at the blocks added. 

Open Questions
---
Do we just sync to one peer, or do we sync to `initial_sync_num_peers` peer?

Prerequisite
----
Requires: #4706
  • Loading branch information
SWvheerden authored Sep 27, 2022
1 parent c659275 commit 1e22a03
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 34 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ buildtools/Output/

clients/base_node_grpc_client/package-lock.json
clients/validator_node_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
pie/
3 changes: 3 additions & 0 deletions base_layer/core/src/mempool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ pub struct MempoolServiceConfig {
pub initial_sync_num_peers: usize,
/// The maximum number of transactions to sync in a single sync session Default: 10_000
pub initial_sync_max_transactions: usize,
/// The maximum number of blocks added via sync or re-org to triggering a sync
pub block_sync_trigger: usize,
}

impl Default for MempoolServiceConfig {
fn default() -> Self {
Self {
initial_sync_num_peers: 2,
initial_sync_max_transactions: 10_000,
block_sync_trigger: 5,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions base_layer/core/src/mempool/sync_protocol/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_service_framework::{async_trait, ServiceInitializationError, ServiceIni
use tokio::{sync::mpsc, time::sleep};

use crate::{
base_node::StateMachineHandle,
base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle},
mempool::{
sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL},
Mempool,
Expand Down Expand Up @@ -83,8 +83,7 @@ impl ServiceInitializer for MempoolSyncInitializer {
log_mdc::extend(mdc.clone());
let state_machine = handles.expect_handle::<StateMachineHandle>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();
// Ensure that we get an subscription ASAP so that we don't miss any connectivity events
let connectivity_event_subscription = connectivity.get_event_subscription();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();

let mut status_watch = state_machine.get_status_info_watch();
if !status_watch.borrow().bootstrapped {
Expand All @@ -103,8 +102,9 @@ impl ServiceInitializer for MempoolSyncInitializer {
}
log_mdc::extend(mdc.clone());
}
let base_node_events = base_node.get_block_event_stream();

MempoolSyncProtocol::new(config, notif_rx, connectivity_event_subscription, mempool)
MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events)
.run()
.await;
});
Expand Down
69 changes: 64 additions & 5 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub use initializer::MempoolSyncInitializer;
use log::*;
use prost::Message;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityEventRx},
connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection},
framing,
framing::CanonicalFraming,
message::MessageExt,
Expand All @@ -97,6 +97,8 @@ use tokio::{
};

use crate::{
base_node::comms_interface::{BlockEvent, BlockEventReceiver},
chain_storage::BlockAddResult,
mempool::{metrics, proto, Mempool, MempoolServiceConfig},
proto as shared_proto,
transactions::transaction_components::Transaction,
Expand All @@ -116,10 +118,11 @@ pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1"
pub struct MempoolSyncProtocol<TSubstream> {
config: MempoolServiceConfig,
protocol_notifier: ProtocolNotificationRx<TSubstream>,
connectivity_events: ConnectivityEventRx,
mempool: Mempool,
num_synched: Arc<AtomicUsize>,
permits: Arc<Semaphore>,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver,
}

impl<TSubstream> MempoolSyncProtocol<TSubstream>
Expand All @@ -128,25 +131,31 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
pub fn new(
config: MempoolServiceConfig,
protocol_notifier: ProtocolNotificationRx<TSubstream>,
connectivity_events: ConnectivityEventRx,
mempool: Mempool,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver,
) -> Self {
Self {
config,
protocol_notifier,
connectivity_events,
mempool,
num_synched: Arc::new(AtomicUsize::new(0)),
permits: Arc::new(Semaphore::new(1)),
connectivity,
block_event_stream,
}
}

pub async fn run(mut self) {
info!(target: LOG_TARGET, "Mempool protocol handler has started");

let mut connectivity_events = self.connectivity.get_event_subscription();
loop {
tokio::select! {
Ok(event) = self.connectivity_events.recv() => {
Ok(block_event) = self.block_event_stream.recv() => {
self.handle_block_event(&block_event).await;
},
Ok(event) = connectivity_events.recv() => {
self.handle_connectivity_event(event).await;
},

Expand Down Expand Up @@ -174,6 +183,56 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
}
}

async fn handle_block_event(&mut self, block_event: &BlockEvent) {
use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
match block_event {
ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
if added.len() < self.config.block_sync_trigger {
return;
}
},
BlockSyncComplete(tip, starting_sync_height) => {
let added = tip.height() - starting_sync_height;
if added < self.config.block_sync_trigger as u64 {
return;
}
},
_ => {
return;
},
}
// we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
// initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
// initial_sync_num_peers
self.num_synched.store(0, Ordering::Release);
let connections = match self
.connectivity
.select_connections(ConnectivitySelection::random_nodes(
self.config.initial_sync_num_peers,
vec![],
))
.await
{
Ok(v) => {
if v.is_empty() {
error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to");
return;
};
v
},
Err(e) => {
error!(
target: LOG_TARGET,
"Mempool sync could not get a peer to sync to: {}", e
);
return;
},
};
for connection in connections {
self.spawn_initiator_protocol(connection).await;
}
}

fn is_synched(&self) -> bool {
self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
}
Expand Down
51 changes: 27 additions & 24 deletions base_layer/core/src/mempool/sync_protocol/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ use std::{fmt, io, iter::repeat_with, sync::Arc};
use futures::{Sink, SinkExt, Stream, StreamExt};
use tari_common::configuration::Network;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityEventTx},
connectivity::ConnectivityEvent,
framing,
memsocket::MemorySocket,
message::MessageExt,
peer_manager::PeerFeatures,
protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx},
test_utils::{mocks::create_peer_connection_mock_pair, node_identity::build_node_identity},
test_utils::{
mocks::{create_connectivity_mock, create_peer_connection_mock_pair, ConnectivityManagerMockState},
node_identity::build_node_identity,
},
Bytes,
BytesMut,
};
Expand Down Expand Up @@ -80,38 +83,45 @@ async fn setup(
num_txns: usize,
) -> (
ProtocolNotificationTx<MemorySocket>,
ConnectivityEventTx,
ConnectivityManagerMockState,
Mempool,
Vec<Transaction>,
) {
let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1);
let (connectivity_events_tx, connectivity_events_rx) = broadcast::channel(10);
let (mempool, transactions) = new_mempool_with_transactions(num_txns).await;
let (connectivity, connectivity_manager_mock) = create_connectivity_mock();
let connectivity_manager_mock_state = connectivity_manager_mock.spawn();
let (block_event_sender, _) = broadcast::channel(1);
let block_receiver = block_event_sender.subscribe();
let protocol = MempoolSyncProtocol::new(
Default::default(),
protocol_notif_rx,
connectivity_events_rx,
mempool.clone(),
connectivity,
block_receiver,
);

task::spawn(protocol.run());

(protocol_notif_tx, connectivity_events_tx, mempool, transactions)
connectivity_manager_mock_state.wait_until_event_receivers_ready().await;
(
protocol_notif_tx,
connectivity_manager_mock_state,
mempool,
transactions,
)
}

#[tokio::test]
async fn empty_set() {
let (_, connectivity_events_tx, mempool1, _) = setup(0).await;
let (_, connectivity_manager_state, mempool1, _) = setup(0).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand All @@ -131,17 +141,15 @@ async fn empty_set() {

#[tokio::test]
async fn synchronise() {
let (_, connectivity_events_tx, mempool1, transactions1) = setup(5).await;
let (_, connectivity_manager_state, mempool1, transactions1) = setup(5).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand All @@ -165,17 +173,14 @@ async fn synchronise() {

#[tokio::test]
async fn duplicate_set() {
let (_, connectivity_events_tx, mempool1, transactions1) = setup(2).await;

let (_, connectivity_manager_state, mempool1, transactions1) = setup(2).await;
let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand Down Expand Up @@ -269,17 +274,15 @@ async fn initiator_messages() {

#[tokio::test]
async fn responder_messages() {
let (_, connectivity_events_tx, _, transactions1) = setup(1).await;
let (_, connectivity_manager_state, _, transactions1) = setup(1).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let mut framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ track_reorgs = true
#service.initial_sync_num_peers = 2
# The maximum number of transactions to sync in a single sync session Default: 10_000
#service.initial_sync_max_transactions = 10_000
# The maximum number of blocks added via sync or re-org to triggering a sync
#block_sync_trigger = 5

[base_node.state_machine]
# The initial max sync latency. If a peer fails to stream a header/block within this deadline another sync peer will be
Expand Down
11 changes: 11 additions & 0 deletions comms/core/src/test_utils/mocks/connectivity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ impl ConnectivityManagerMockState {
}
}

pub async fn wait_until_event_receivers_ready(&self) {
let mut timeout = 0;
while self.event_tx.receiver_count() == 0 {
time::sleep(Duration::from_millis(10)).await;
timeout += 10;
if timeout > 5000 {
panic!("Event receiver not ready after 5 secs");
}
}
}

async fn add_call(&self, call_str: String) {
self.with_state(|state| state.calls.push(call_str)).await
}
Expand Down

0 comments on commit 1e22a03

Please sign in to comment.