Skip to content

Commit

Permalink
trigger mempool sync on lag
Browse files Browse the repository at this point in the history
Co-authored-by: Stan Bondi <[email protected]>
  • Loading branch information
SWvheerden and sdbondi committed Sep 26, 2022
1 parent f55762e commit fd016f3
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 12 deletions.
3 changes: 3 additions & 0 deletions base_layer/core/src/mempool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ pub struct MempoolServiceConfig {
pub initial_sync_num_peers: usize,
/// The maximum number of transactions to sync in a single sync session Default: 10_000
pub initial_sync_max_transactions: usize,
/// The maximum number of blocks added via sync or re-org to triggering a sync
pub block_sync_trigger: usize,
}

impl Default for MempoolServiceConfig {
fn default() -> Self {
Self {
initial_sync_num_peers: 2,
initial_sync_max_transactions: 10_000,
block_sync_trigger: 5,
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions base_layer/core/src/mempool/sync_protocol/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use tari_service_framework::{async_trait, ServiceInitializationError, ServiceIni
use tokio::{sync::mpsc, time::sleep};

use crate::{
base_node::StateMachineHandle,
base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle},
mempool::{
sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL},
sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL},
Mempool,
MempoolServiceConfig,
},
Expand Down Expand Up @@ -83,8 +83,7 @@ impl ServiceInitializer for MempoolSyncInitializer {
log_mdc::extend(mdc.clone());
let state_machine = handles.expect_handle::<StateMachineHandle>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();
// Ensure that we get an subscription ASAP so that we don't miss any connectivity events
let connectivity_event_subscription = connectivity.get_event_subscription();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();

let mut status_watch = state_machine.get_status_info_watch();
if !status_watch.borrow().bootstrapped {
Expand All @@ -103,8 +102,9 @@ impl ServiceInitializer for MempoolSyncInitializer {
}
log_mdc::extend(mdc.clone());
}
let base_node_events = base_node.get_block_event_stream();

MempoolSyncProtocol::new(config, notif_rx, connectivity_event_subscription, mempool)
MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events)
.run()
.await;
});
Expand Down
72 changes: 67 additions & 5 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub use initializer::MempoolSyncInitializer;
use log::*;
use prost::Message;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityEventRx},
connectivity::{ConnectivityEvent, ConnectivityEventRx, ConnectivityRequester, ConnectivitySelection},
framing,
framing::CanonicalFraming,
message::MessageExt,
Expand All @@ -97,6 +97,8 @@ use tokio::{
};

use crate::{
base_node::comms_interface::{BlockEvent, BlockEventReceiver},
chain_storage::BlockAddResult,
mempool::{metrics, proto, Mempool, MempoolServiceConfig},
proto as shared_proto,
transactions::transaction_components::Transaction,
Expand All @@ -113,13 +115,19 @@ const LOG_TARGET: &str = "c::mempool::sync_protocol";

pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1");

pub struct MempoolSyncStreams {
pub block_event_stream: BlockEventReceiver,
pub connectivity_events: ConnectivityEventRx,
}

pub struct MempoolSyncProtocol<TSubstream> {
config: MempoolServiceConfig,
protocol_notifier: ProtocolNotificationRx<TSubstream>,
connectivity_events: ConnectivityEventRx,
mempool: Mempool,
num_synched: Arc<AtomicUsize>,
permits: Arc<Semaphore>,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver
}

impl<TSubstream> MempoolSyncProtocol<TSubstream>
Expand All @@ -128,25 +136,30 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
pub fn new(
config: MempoolServiceConfig,
protocol_notifier: ProtocolNotificationRx<TSubstream>,
connectivity_events: ConnectivityEventRx,
mempool: Mempool,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver
) -> Self {
Self {
config,
protocol_notifier,
connectivity_events,
mempool,
num_synched: Arc::new(AtomicUsize::new(0)),
permits: Arc::new(Semaphore::new(1)),
connectivity,block_event_stream
}
}

pub async fn run(mut self) {
info!(target: LOG_TARGET, "Mempool protocol handler has started");

let mut connectivity_events = self.connectivity.get_event_subscription();
loop {
tokio::select! {
Ok(event) = self.connectivity_events.recv() => {
Ok(block_event) = self.block_event_stream.recv() => {
self.handle_block_event(&block_event).await;
},
Ok(event) = connectivity_events.recv() => {
self.handle_connectivity_event(event).await;
},

Expand Down Expand Up @@ -174,6 +187,55 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
}
}

async fn handle_block_event(&mut self, block_event: &BlockEvent) {
use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
if self.permits.available_permits() < 1 {
// Sync is already in progress, so we should not bother trying to sync.
return;
}
match block_event {
ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
if added.len() < self.config.block_sync_trigger {
return;
}
},
BlockSyncComplete(tip, starting_sync_height) => {
let added = tip.height() - starting_sync_height;
if added < self.config.block_sync_trigger as u64 {
return;
}
},
_ => {
return;
},
}
// we need to make sure the service can start a sync
if self.num_synched.load(Ordering::Acquire) >= self.config.initial_sync_num_peers {
self.num_synched.fetch_sub(1, Ordering::Release);
}
let connection = match self
.connectivity
.select_connections(ConnectivitySelection::random_nodes(1, vec![]))
.await
{
Ok(mut v) => {
if v.is_empty() {
error!(target: LOG_TARGET, "Mempool sync could not get a peer to sync to");
return;
};
v.pop().unwrap()
},
Err(e) => {
error!(
target: LOG_TARGET,
"Mempool sync could not get a peer to sync to: {}", e
);
return;
},
};
self.spawn_initiator_protocol(connection).await;
}

fn is_synched(&self) -> bool {
self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
}
Expand Down
12 changes: 10 additions & 2 deletions base_layer/core/src/mempool/sync_protocol/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use tari_comms::{
message::MessageExt,
peer_manager::PeerFeatures,
protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx},
test_utils::{mocks::create_peer_connection_mock_pair, node_identity::build_node_identity},
test_utils::{
mocks::{create_connectivity_mock, create_peer_connection_mock_pair},
node_identity::build_node_identity,
},
Bytes,
BytesMut,
};
Expand Down Expand Up @@ -87,14 +90,19 @@ async fn setup(
let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1);
let (connectivity_events_tx, connectivity_events_rx) = broadcast::channel(10);
let (mempool, transactions) = new_mempool_with_transactions(num_txns).await;
let (connectivity, _) = create_connectivity_mock();
let (block_event_sender, _) = broadcast::channel(1);
let block_receiver = block_event_sender.subscribe();

let protocol = MempoolSyncProtocol::new(
Default::default(),
protocol_notif_rx,
connectivity_events_rx,
mempool.clone(),
connectivity,
);

task::spawn(protocol.run());
task::spawn(protocol.run(block_receiver));

(protocol_notif_tx, connectivity_events_tx, mempool, transactions)
}
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ track_reorgs = true
#service.initial_sync_num_peers = 2
# The maximum number of transactions to sync in a single sync session Default: 10_000
#service.initial_sync_max_transactions = 10_000
# The maximum number of blocks added via sync or re-org to triggering a sync
#block_sync_trigger = 5

[base_node.state_machine]
# The initial max sync latency. If a peer fails to stream a header/block within this deadline another sync peer will be
Expand Down

0 comments on commit fd016f3

Please sign in to comment.