Skip to content

Commit

Permalink
Add new tests bootstrap (#4509)
Browse files Browse the repository at this point in the history
  • Loading branch information
AurelienFT authored Nov 3, 2023
1 parent 84d6bac commit 4fec46a
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 12 deletions.
2 changes: 1 addition & 1 deletion massa-bootstrap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ fn step_timeout_duration(bs_deadline: &Instant, step_timeout: &Duration) -> Opti
Some(std::cmp::min(remaining, *step_timeout))
}
#[allow(clippy::too_many_arguments)]
fn manage_bootstrap(
pub(crate) fn manage_bootstrap(
bootstrap_config: &BootstrapConfig,
server: &mut BootstrapServerBinder,
final_state: Arc<RwLock<FinalState>>,
Expand Down
294 changes: 293 additions & 1 deletion massa-bootstrap/src/tests/binders.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::messages::{BootstrapClientMessage, BootstrapServerMessage};
use crate::server::manage_bootstrap;
use crate::settings::{BootstrapClientConfig, BootstrapSrvBindCfg};

use crate::{
bindings::{BootstrapClientBinder, BootstrapServerBinder},
tests::tools::get_bootstrap_config,
};
use crate::{BootstrapConfig, BootstrapError};
use massa_consensus_exports::MockConsensusController;
use massa_db_exports::{MassaDBConfig, MassaDBController};
use massa_db_worker::MassaDB;
use massa_final_state::FinalStateConfig;
use massa_models::config::{
BOOTSTRAP_RANDOMNESS_SIZE_BYTES, CONSENSUS_BOOTSTRAP_PART_SIZE, ENDORSEMENT_COUNT,
MAX_ADVERTISE_LENGTH, MAX_BOOTSTRAP_BLOCKS, MAX_BOOTSTRAP_ERROR_LENGTH,
Expand All @@ -20,19 +25,25 @@ use massa_models::config::{
use massa_models::node::NodeId;
use massa_models::version::Version;

use massa_pos_exports::{MockSelectorControllerWrapper, PoSFinalState};
use massa_protocol_exports::{MockProtocolController, BootstrapPeers};
use massa_signature::{KeyPair, PublicKey};
use massa_time::MassaTime;

use parking_lot::RwLock;
use rand::Rng;
use serial_test::serial;
use tempfile::TempDir;

use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::{Duration, Instant};

use super::tools::parametric_test;
use super::tools::{get_random_final_state_bootstrap, parametric_test};

lazy_static::lazy_static! {
pub static ref BOOTSTRAP_CONFIG_KEYPAIR: (BootstrapConfig, KeyPair) = {
Expand Down Expand Up @@ -359,6 +370,287 @@ fn test_partial_msg() {
client_thread.join().unwrap();
}

#[test]
fn test_staying_connected_without_message_trigger_read_timeout() {
let read_timeout = Duration::from_millis(1000);
let (mut bootstrap_config, server_keypair): (BootstrapConfig, KeyPair) = BOOTSTRAP_CONFIG_KEYPAIR.clone();
bootstrap_config.read_timeout = MassaTime::try_from(read_timeout).unwrap();
let server = std::net::TcpListener::bind("localhost:0").unwrap();
let addr = server.local_addr().unwrap();
let client = std::net::TcpStream::connect(addr).unwrap();
let mut client_clone = client.try_clone().unwrap();
let server = server.accept().unwrap();
let version = || Version::from_str("TEST.1.10").unwrap();
let timeout: Duration = Duration::from_millis(10000);
let consensus_controller = MockConsensusController::new();
let protocol_controller = MockProtocolController::new();
let selector_controller = MockSelectorControllerWrapper::new();

let mut server = BootstrapServerBinder::new(
server.0,
server_keypair.clone(),
BootstrapSrvBindCfg {
rate_limit: u64::MAX,
thread_count: THREAD_COUNT,
max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH,
randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES,
consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE,
write_error_timeout: MassaTime::from_millis(1000),
},
None,
);
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(RwLock::new(Box::new(MassaDB::new(MassaDBConfig {
path: temp_dir.path().to_path_buf(),
max_history_length: 10,
max_final_state_elements_size: 100_000_000,
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
let final_state_config = FinalStateConfig::default();
let server_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
manage_bootstrap(
&bootstrap_config,
&mut server,
Arc::new(RwLock::new(get_random_final_state_bootstrap(
PoSFinalState::new(
final_state_config.pos_config.clone(),
"",
&rolls_path,
Box::new(selector_controller.clone()),
db.clone(),
)
.unwrap(),
final_state_config,
db,
))),
version(),
Box::new(consensus_controller),
Box::new(protocol_controller),
Instant::now().checked_add(timeout).unwrap(),
)
}
})
.unwrap();

let client_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
std::thread::sleep(read_timeout.checked_add(Duration::from_millis(100)).unwrap());
client_clone
.write_all(b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
.unwrap()
}
})
.unwrap();

let res = server_thread.join().unwrap();
match res {
Err(BootstrapError::TimedOut(_)) => (),
_ => panic!("The server should have been interrupted"),
}
client_thread.join().unwrap();
}

#[test]
fn test_staying_connected_pass_handshake_but_deadline_after() {
let read_timeout = Duration::from_millis(1000);
let (mut bootstrap_config, server_keypair): (BootstrapConfig, KeyPair) = BOOTSTRAP_CONFIG_KEYPAIR.clone();
bootstrap_config.read_timeout = MassaTime::try_from(read_timeout).unwrap();
let server = std::net::TcpListener::bind("localhost:0").unwrap();
let addr = server.local_addr().unwrap();
let client = std::net::TcpStream::connect(addr).unwrap();
let mut client_clone = client.try_clone().unwrap();
let server = server.accept().unwrap();
let version = || Version::from_str("TEST.1.10").unwrap();
let timeout: Duration = Duration::from_millis(10);
let consensus_controller = MockConsensusController::new();
let protocol_controller = MockProtocolController::new();
let selector_controller = MockSelectorControllerWrapper::new();

let mut server = BootstrapServerBinder::new(
server.0,
server_keypair.clone(),
BootstrapSrvBindCfg {
rate_limit: u64::MAX,
thread_count: THREAD_COUNT,
max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH,
randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES,
consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE,
write_error_timeout: MassaTime::from_millis(1000),
},
None,
);
let mut client = BootstrapClientBinder::test_default(
client,
bootstrap_config.bootstrap_list[0].1.get_public_key(),
);
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(RwLock::new(Box::new(MassaDB::new(MassaDBConfig {
path: temp_dir.path().to_path_buf(),
max_history_length: 10,
max_final_state_elements_size: 100_000_000,
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
let final_state_config = FinalStateConfig::default();
let server_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
manage_bootstrap(
&bootstrap_config,
&mut server,
Arc::new(RwLock::new(get_random_final_state_bootstrap(
PoSFinalState::new(
final_state_config.pos_config.clone(),
"",
&rolls_path,
Box::new(selector_controller.clone()),
db.clone(),
)
.unwrap(),
final_state_config,
db,
))),
version(),
Box::new(consensus_controller),
Box::new(protocol_controller),
Instant::now().checked_add(timeout).unwrap(),
)
}
})
.unwrap();

let client_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
client.handshake(version()).unwrap();
std::thread::sleep(timeout.checked_add(Duration::from_millis(100)).unwrap());
client_clone
.write_all(b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
.unwrap()
}
})
.unwrap();

let res = server_thread.join().unwrap();
match res {
Err(BootstrapError::Interupted(_)) => (),
_ => panic!("The server should have been interrupted"),
}
client_thread.join().unwrap();
}

#[test]
fn test_staying_connected_pass_handshake_but_deadline_during_data_exchange() {
let read_timeout = Duration::from_millis(500);
let (mut bootstrap_config, server_keypair): (BootstrapConfig, KeyPair) = BOOTSTRAP_CONFIG_KEYPAIR.clone();
bootstrap_config.read_timeout = MassaTime::try_from(read_timeout).unwrap();
let server = std::net::TcpListener::bind("localhost:0").unwrap();
let addr = server.local_addr().unwrap();
let client = std::net::TcpStream::connect(addr).unwrap();
let server = server.accept().unwrap();
let version = || Version::from_str("TEST.1.10").unwrap();
let timeout: Duration = Duration::from_millis(800);
let consensus_controller = MockConsensusController::new();
let mut protocol_controller = MockProtocolController::new();
protocol_controller.expect_get_bootstrap_peers().returning(|| {
Ok(BootstrapPeers(vec![]))
});
let selector_controller = MockSelectorControllerWrapper::new();

let mut server = BootstrapServerBinder::new(
server.0,
server_keypair.clone(),
BootstrapSrvBindCfg {
rate_limit: u64::MAX,
thread_count: THREAD_COUNT,
max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH,
randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES,
consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE,
write_error_timeout: MassaTime::from_millis(1000),
},
None,
);
let mut client = BootstrapClientBinder::test_default(
client,
bootstrap_config.bootstrap_list[0].1.get_public_key(),
);
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(RwLock::new(Box::new(MassaDB::new(MassaDBConfig {
path: temp_dir.path().to_path_buf(),
max_history_length: 10,
max_final_state_elements_size: 100_000_000,
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
let final_state_config = FinalStateConfig::default();
let server_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
manage_bootstrap(
&bootstrap_config,
&mut server,
Arc::new(RwLock::new(get_random_final_state_bootstrap(
PoSFinalState::new(
final_state_config.pos_config.clone(),
"",
&rolls_path,
Box::new(selector_controller.clone()),
db.clone(),
)
.unwrap(),
final_state_config,
db,
))),
version(),
Box::new(consensus_controller),
Box::new(protocol_controller),
Instant::now().checked_add(timeout).unwrap(),
)
}
})
.unwrap();

let client_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
client.handshake(version()).unwrap();
// Pass the bootstrap time sent by server
client.next_timeout(Some(read_timeout)).unwrap();
for _ in 0..10 {
let _ = client.send_timeout(&BootstrapClientMessage::AskBootstrapPeers, Some(Duration::from_millis(1000)));
let _ = client.next_timeout(Some(read_timeout));
std::thread::sleep(timeout.div_f32(5.0));
}
}
})
.unwrap();

let res = server_thread.join().unwrap();
// if someone doesn't send anything in a data exchange phase the server consider this exchange as finished and so finish without problems
match res {
Ok(()) => (),
_ => panic!("The server should stop without problems"),
}
client_thread.join().unwrap();
}


// serial test for time-taken sensitive tests: reduces parallelism noise
#[test]
#[serial]
Expand Down
8 changes: 2 additions & 6 deletions massa-bootstrap/src/tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ fn test_serialize_bootstrap_server_message() {
parametric_test(
Duration::from_secs(30),
config,
vec![
4418156015316874292,
],
vec![4418156015316874292],
|config, rng| {
let msg = BootstrapServerMessage::generate(rng);
let mut bytes = Vec::new();
Expand Down Expand Up @@ -82,9 +80,7 @@ fn test_serialize_bootstrap_client_message() {
parametric_test(
Duration::from_secs(30),
(),
vec![
12770601678208144436,
],
vec![12770601678208144436],
|_, rng| {
let msg = BootstrapClientMessage::generate(rng);
let mut bytes = Vec::new();
Expand Down
10 changes: 6 additions & 4 deletions massa-bootstrap/src/tests/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,9 +770,11 @@ impl BootstrapServerMessage {
}
2 => {
let slot = gen_random_slot(rng);
let state_part = gen_random_stream_batch(MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE, slot, rng);
let state_part =
gen_random_stream_batch(MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE, slot, rng);
let slot = gen_random_slot(rng);
let versioning_part = gen_random_stream_batch(MAX_BOOTSTRAP_VERSIONING_ELEMENTS_SIZE, slot, rng);
let versioning_part =
gen_random_stream_batch(MAX_BOOTSTRAP_VERSIONING_ELEMENTS_SIZE, slot, rng);
let mut final_blocks = vec![];
let block_nb = rng.gen_range(5..100); //MAX_BOOTSTRAP_BLOCKS);
for _ in 0..block_nb {
Expand Down Expand Up @@ -1223,7 +1225,7 @@ impl BootstrapClientMessage {
} else {
StreamingStep::Started
};

let last_consensus_step = if last_slot.is_some() {
let nb = rng.gen_range(0..100);
let mut data = PreHashSet::with_capacity(nb);
Expand All @@ -1234,7 +1236,7 @@ impl BootstrapClientMessage {
} else {
StreamingStep::Started
};

let send_last_start_period = if last_slot.is_none() {
true
} else {
Expand Down

0 comments on commit 4fec46a

Please sign in to comment.