Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trigger mempool sync on lag #4730

Merged
merged 3 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ buildtools/Output/

clients/base_node_grpc_client/package-lock.json
clients/validator_node_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
pie/
3 changes: 3 additions & 0 deletions base_layer/core/src/mempool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ pub struct MempoolServiceConfig {
pub initial_sync_num_peers: usize,
/// The maximum number of transactions to sync in a single sync session Default: 10_000
pub initial_sync_max_transactions: usize,
/// The maximum number of blocks added via sync or re-org to triggering a sync
pub block_sync_trigger: usize,
}

impl Default for MempoolServiceConfig {
fn default() -> Self {
Self {
initial_sync_num_peers: 2,
initial_sync_max_transactions: 10_000,
block_sync_trigger: 5,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions base_layer/core/src/mempool/sync_protocol/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_service_framework::{async_trait, ServiceInitializationError, ServiceIni
use tokio::{sync::mpsc, time::sleep};

use crate::{
base_node::StateMachineHandle,
base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle},
mempool::{
sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL},
Mempool,
Expand Down Expand Up @@ -83,8 +83,7 @@ impl ServiceInitializer for MempoolSyncInitializer {
log_mdc::extend(mdc.clone());
let state_machine = handles.expect_handle::<StateMachineHandle>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();
// Ensure that we get an subscription ASAP so that we don't miss any connectivity events
let connectivity_event_subscription = connectivity.get_event_subscription();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();

let mut status_watch = state_machine.get_status_info_watch();
if !status_watch.borrow().bootstrapped {
Expand All @@ -103,8 +102,9 @@ impl ServiceInitializer for MempoolSyncInitializer {
}
log_mdc::extend(mdc.clone());
}
let base_node_events = base_node.get_block_event_stream();

MempoolSyncProtocol::new(config, notif_rx, connectivity_event_subscription, mempool)
MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events)
.run()
.await;
});
Expand Down
69 changes: 64 additions & 5 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub use initializer::MempoolSyncInitializer;
use log::*;
use prost::Message;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityEventRx},
connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection},
framing,
framing::CanonicalFraming,
message::MessageExt,
Expand All @@ -97,6 +97,8 @@ use tokio::{
};

use crate::{
base_node::comms_interface::{BlockEvent, BlockEventReceiver},
chain_storage::BlockAddResult,
mempool::{metrics, proto, Mempool, MempoolServiceConfig},
proto as shared_proto,
transactions::transaction_components::Transaction,
Expand All @@ -116,10 +118,11 @@ pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1"
pub struct MempoolSyncProtocol<TSubstream> {
config: MempoolServiceConfig,
protocol_notifier: ProtocolNotificationRx<TSubstream>,
connectivity_events: ConnectivityEventRx,
mempool: Mempool,
num_synched: Arc<AtomicUsize>,
permits: Arc<Semaphore>,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver,
}

impl<TSubstream> MempoolSyncProtocol<TSubstream>
Expand All @@ -128,25 +131,31 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
pub fn new(
config: MempoolServiceConfig,
protocol_notifier: ProtocolNotificationRx<TSubstream>,
connectivity_events: ConnectivityEventRx,
mempool: Mempool,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver,
) -> Self {
Self {
config,
protocol_notifier,
connectivity_events,
mempool,
num_synched: Arc::new(AtomicUsize::new(0)),
permits: Arc::new(Semaphore::new(1)),
connectivity,
block_event_stream,
}
}

pub async fn run(mut self) {
info!(target: LOG_TARGET, "Mempool protocol handler has started");

let mut connectivity_events = self.connectivity.get_event_subscription();
loop {
tokio::select! {
Ok(event) = self.connectivity_events.recv() => {
Ok(block_event) = self.block_event_stream.recv() => {
self.handle_block_event(&block_event).await;
},
Ok(event) = connectivity_events.recv() => {
self.handle_connectivity_event(event).await;
},

Expand Down Expand Up @@ -174,6 +183,56 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
}
}

async fn handle_block_event(&mut self, block_event: &BlockEvent) {
use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
match block_event {
ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
if added.len() < self.config.block_sync_trigger {
return;
}
},
BlockSyncComplete(tip, starting_sync_height) => {
let added = tip.height() - starting_sync_height;
if added < self.config.block_sync_trigger as u64 {
return;
}
},
_ => {
return;
},
}
// we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
// initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
// initial_sync_num_peers
self.num_synched.store(0, Ordering::Release);
let connections = match self
.connectivity
.select_connections(ConnectivitySelection::random_nodes(
self.config.initial_sync_num_peers,
vec![],
))
.await
{
Ok(v) => {
if v.is_empty() {
error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to");
return;
};
v
},
Err(e) => {
error!(
target: LOG_TARGET,
"Mempool sync could not get a peer to sync to: {}", e
);
return;
},
};
for connection in connections {
self.spawn_initiator_protocol(connection).await;
}
}

fn is_synched(&self) -> bool {
self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
}
Expand Down
51 changes: 27 additions & 24 deletions base_layer/core/src/mempool/sync_protocol/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ use std::{fmt, io, iter::repeat_with, sync::Arc};
use futures::{Sink, SinkExt, Stream, StreamExt};
use tari_common::configuration::Network;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityEventTx},
connectivity::ConnectivityEvent,
framing,
memsocket::MemorySocket,
message::MessageExt,
peer_manager::PeerFeatures,
protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx},
test_utils::{mocks::create_peer_connection_mock_pair, node_identity::build_node_identity},
test_utils::{
mocks::{create_connectivity_mock, create_peer_connection_mock_pair, ConnectivityManagerMockState},
node_identity::build_node_identity,
},
Bytes,
BytesMut,
};
Expand Down Expand Up @@ -80,38 +83,45 @@ async fn setup(
num_txns: usize,
) -> (
ProtocolNotificationTx<MemorySocket>,
ConnectivityEventTx,
ConnectivityManagerMockState,
Mempool,
Vec<Transaction>,
) {
let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1);
let (connectivity_events_tx, connectivity_events_rx) = broadcast::channel(10);
let (mempool, transactions) = new_mempool_with_transactions(num_txns).await;
let (connectivity, connectivity_manager_mock) = create_connectivity_mock();
let connectivity_manager_mock_state = connectivity_manager_mock.spawn();
let (block_event_sender, _) = broadcast::channel(1);
let block_receiver = block_event_sender.subscribe();
let protocol = MempoolSyncProtocol::new(
Default::default(),
protocol_notif_rx,
connectivity_events_rx,
mempool.clone(),
connectivity,
block_receiver,
);

task::spawn(protocol.run());

(protocol_notif_tx, connectivity_events_tx, mempool, transactions)
connectivity_manager_mock_state.wait_until_event_receivers_ready().await;
(
protocol_notif_tx,
connectivity_manager_mock_state,
mempool,
transactions,
)
}

#[tokio::test]
async fn empty_set() {
let (_, connectivity_events_tx, mempool1, _) = setup(0).await;
let (_, connectivity_manager_state, mempool1, _) = setup(0).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand All @@ -131,17 +141,15 @@ async fn empty_set() {

#[tokio::test]
async fn synchronise() {
let (_, connectivity_events_tx, mempool1, transactions1) = setup(5).await;
let (_, connectivity_manager_state, mempool1, transactions1) = setup(5).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand All @@ -165,17 +173,14 @@ async fn synchronise() {

#[tokio::test]
async fn duplicate_set() {
let (_, connectivity_events_tx, mempool1, transactions1) = setup(2).await;

let (_, connectivity_manager_state, mempool1, transactions1) = setup(2).await;
let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand Down Expand Up @@ -269,17 +274,15 @@ async fn initiator_messages() {

#[tokio::test]
async fn responder_messages() {
let (_, connectivity_events_tx, _, transactions1) = setup(1).await;
let (_, connectivity_manager_state, _, transactions1) = setup(1).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let mut framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ track_reorgs = true
#service.initial_sync_num_peers = 2
# The maximum number of transactions to sync in a single sync session Default: 10_000
#service.initial_sync_max_transactions = 10_000
# The maximum number of blocks added via sync or re-org to triggering a sync
#block_sync_trigger = 5

[base_node.state_machine]
# The initial max sync latency. If a peer fails to stream a header/block within this deadline another sync peer will be
Expand Down
11 changes: 11 additions & 0 deletions comms/core/src/test_utils/mocks/connectivity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ impl ConnectivityManagerMockState {
}
}

pub async fn wait_until_event_receivers_ready(&self) {
let mut timeout = 0;
while self.event_tx.receiver_count() == 0 {
time::sleep(Duration::from_millis(10)).await;
timeout += 10;
if timeout > 5000 {
panic!("Event receiver not ready after 5 secs");
}
}
}

async fn add_call(&self, call_str: String) {
self.with_state(|state| state.calls.push(call_str)).await
}
Expand Down