Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden committed Sep 27, 2022
1 parent 875db1c commit de7245d
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 41 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ buildtools/Output/

clients/base_node_grpc_client/package-lock.json
clients/validator_node_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
pie/
29 changes: 15 additions & 14 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static

async fn handle_block_event(&mut self, block_event: &BlockEvent) {
use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
if self.permits.available_permits() < 1 {
// Sync is already in progress, so we should not bother trying to sync.
return;
}
match block_event {
ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
if added.len() < self.config.block_sync_trigger {
Expand All @@ -205,21 +201,24 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
return;
},
}
// we need to make sure the service can start a sync
if self.num_synched.load(Ordering::Acquire) >= self.config.initial_sync_num_peers {
self.num_synched.fetch_sub(1, Ordering::Release);
}
let connection = match self
// we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
// initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
// initial_sync_num_peers
self.num_synched.store(0, Ordering::Release);
let connections = match self
.connectivity
.select_connections(ConnectivitySelection::random_nodes(1, vec![]))
.select_connections(ConnectivitySelection::random_nodes(
self.config.initial_sync_num_peers,
vec![],
))
.await
{
Ok(mut v) => {
Ok(v) => {
if v.is_empty() {
error!(target: LOG_TARGET, "Mempool sync could not get a peer to sync to");
error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to");
return;
};
v.pop().unwrap()
v
},
Err(e) => {
error!(
Expand All @@ -229,7 +228,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
return;
},
};
self.spawn_initiator_protocol(connection).await;
for connection in connections {
self.spawn_initiator_protocol(connection).await;
}
}

fn is_synched(&self) -> bool {
Expand Down
48 changes: 22 additions & 26 deletions base_layer/core/src/mempool/sync_protocol/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use std::{fmt, io, iter::repeat_with, sync::Arc};
use futures::{Sink, SinkExt, Stream, StreamExt};
use tari_common::configuration::Network;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityEventTx},
connectivity::ConnectivityEvent,
framing,
memsocket::MemorySocket,
message::MessageExt,
peer_manager::PeerFeatures,
protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx},
test_utils::{
mocks::{create_connectivity_mock, create_peer_connection_mock_pair},
mocks::{create_connectivity_mock, create_peer_connection_mock_pair, ConnectivityManagerMockState},
node_identity::build_node_identity,
},
Bytes,
Expand Down Expand Up @@ -83,42 +83,45 @@ async fn setup(
num_txns: usize,
) -> (
ProtocolNotificationTx<MemorySocket>,
ConnectivityEventTx,
ConnectivityManagerMockState,
Mempool,
Vec<Transaction>,
) {
let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1);
let (connectivity_events_tx, _) = broadcast::channel(10);
let (mempool, transactions) = new_mempool_with_transactions(num_txns).await;
let (connectivity, _) = create_connectivity_mock();
let (connectivity, connectivity_manager_mock) = create_connectivity_mock();
let connectivity_manager_mock_state = connectivity_manager_mock.spawn();
let (block_event_sender, _) = broadcast::channel(1);
let block_receiver = block_event_sender.subscribe();

let protocol = MempoolSyncProtocol::new(
Default::default(),
protocol_notif_rx,
mempool.clone(),
connectivity,block_receiver
connectivity,
block_receiver,
);

task::spawn(protocol.run());

(protocol_notif_tx, connectivity_events_tx, mempool, transactions)
connectivity_manager_mock_state.wait_until_event_receivers_ready().await;
(
protocol_notif_tx,
connectivity_manager_mock_state,
mempool,
transactions,
)
}

#[tokio::test]
async fn empty_set() {
let (_, connectivity_events_tx, mempool1, _) = setup(0).await;
let (_, connectivity_manager_state, mempool1, _) = setup(0).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand All @@ -138,17 +141,15 @@ async fn empty_set() {

#[tokio::test]
async fn synchronise() {
let (_, connectivity_events_tx, mempool1, transactions1) = setup(5).await;
let (_, connectivity_manager_state, mempool1, transactions1) = setup(5).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand All @@ -172,17 +173,14 @@ async fn synchronise() {

#[tokio::test]
async fn duplicate_set() {
let (_, connectivity_events_tx, mempool1, transactions1) = setup(2).await;

let (_, connectivity_manager_state, mempool1, transactions1) = setup(2).await;
let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand Down Expand Up @@ -276,17 +274,15 @@ async fn initiator_messages() {

#[tokio::test]
async fn responder_messages() {
let (_, connectivity_events_tx, _, transactions1) = setup(1).await;
let (_, connectivity_manager_state, _, transactions1) = setup(1).await;

let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let (_node1_conn, node1_mock, node2_conn, _) =
create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;

// This node connected to a peer, so it should open the substream
connectivity_events_tx
.send(ConnectivityEvent::PeerConnected(node2_conn))
.unwrap();
connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn));

let substream = node1_mock.next_incoming_substream().await.unwrap();
let mut framed = framing::canonical(substream, MAX_FRAME_SIZE);
Expand Down
11 changes: 11 additions & 0 deletions comms/core/src/test_utils/mocks/connectivity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ impl ConnectivityManagerMockState {
}
}

pub async fn wait_until_event_receivers_ready(&self) {
let mut timeout = 0;
while self.event_tx.receiver_count() == 0 {
time::sleep(Duration::from_millis(10)).await;
timeout += 10;
if timeout > 5000 {
panic!("Event receiver not ready after 5 secs");
}
}
}

async fn add_call(&self, call_str: String) {
self.with_state(|state| state.calls.push(call_str)).await
}
Expand Down

0 comments on commit de7245d

Please sign in to comment.