Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new tests bootstrap #4509

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion massa-bootstrap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ fn step_timeout_duration(bs_deadline: &Instant, step_timeout: &Duration) -> Opti
Some(std::cmp::min(remaining, *step_timeout))
}
#[allow(clippy::too_many_arguments)]
fn manage_bootstrap(
pub(crate) fn manage_bootstrap(
bootstrap_config: &BootstrapConfig,
server: &mut BootstrapServerBinder,
final_state: Arc<RwLock<FinalState>>,
Expand Down
294 changes: 293 additions & 1 deletion massa-bootstrap/src/tests/binders.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::messages::{BootstrapClientMessage, BootstrapServerMessage};
use crate::server::manage_bootstrap;
use crate::settings::{BootstrapClientConfig, BootstrapSrvBindCfg};

use crate::{
bindings::{BootstrapClientBinder, BootstrapServerBinder},
tests::tools::get_bootstrap_config,
};
use crate::{BootstrapConfig, BootstrapError};
use massa_consensus_exports::MockConsensusController;
use massa_db_exports::{MassaDBConfig, MassaDBController};
use massa_db_worker::MassaDB;
use massa_final_state::FinalStateConfig;
use massa_models::config::{
BOOTSTRAP_RANDOMNESS_SIZE_BYTES, CONSENSUS_BOOTSTRAP_PART_SIZE, ENDORSEMENT_COUNT,
MAX_ADVERTISE_LENGTH, MAX_BOOTSTRAP_BLOCKS, MAX_BOOTSTRAP_ERROR_LENGTH,
Expand All @@ -20,19 +25,25 @@ use massa_models::config::{
use massa_models::node::NodeId;
use massa_models::version::Version;

use massa_pos_exports::{MockSelectorControllerWrapper, PoSFinalState};
use massa_protocol_exports::{MockProtocolController, BootstrapPeers};
use massa_signature::{KeyPair, PublicKey};
use massa_time::MassaTime;

use parking_lot::RwLock;
use rand::Rng;
use serial_test::serial;
use tempfile::TempDir;

use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::{Duration, Instant};

use super::tools::parametric_test;
use super::tools::{get_random_final_state_bootstrap, parametric_test};

lazy_static::lazy_static! {
pub static ref BOOTSTRAP_CONFIG_KEYPAIR: (BootstrapConfig, KeyPair) = {
Expand Down Expand Up @@ -359,6 +370,287 @@ fn test_partial_msg() {
client_thread.join().unwrap();
}

#[test]
fn test_staying_connected_without_message_trigger_read_timeout() {
let read_timeout = Duration::from_millis(1000);
let (mut bootstrap_config, server_keypair): (BootstrapConfig, KeyPair) = BOOTSTRAP_CONFIG_KEYPAIR.clone();
bootstrap_config.read_timeout = MassaTime::try_from(read_timeout).unwrap();
let server = std::net::TcpListener::bind("localhost:0").unwrap();
let addr = server.local_addr().unwrap();
let client = std::net::TcpStream::connect(addr).unwrap();
let mut client_clone = client.try_clone().unwrap();
let server = server.accept().unwrap();
let version = || Version::from_str("TEST.1.10").unwrap();
let timeout: Duration = Duration::from_millis(10000);
let consensus_controller = MockConsensusController::new();
let protocol_controller = MockProtocolController::new();
let selector_controller = MockSelectorControllerWrapper::new();

let mut server = BootstrapServerBinder::new(
server.0,
server_keypair.clone(),
BootstrapSrvBindCfg {
rate_limit: u64::MAX,
thread_count: THREAD_COUNT,
max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH,
randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES,
consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE,
write_error_timeout: MassaTime::from_millis(1000),
},
None,
);
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(RwLock::new(Box::new(MassaDB::new(MassaDBConfig {
path: temp_dir.path().to_path_buf(),
max_history_length: 10,
max_final_state_elements_size: 100_000_000,
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
let final_state_config = FinalStateConfig::default();
let server_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
manage_bootstrap(
&bootstrap_config,
&mut server,
Arc::new(RwLock::new(get_random_final_state_bootstrap(
PoSFinalState::new(
final_state_config.pos_config.clone(),
"",
&rolls_path,
Box::new(selector_controller.clone()),
db.clone(),
)
.unwrap(),
final_state_config,
db,
))),
version(),
Box::new(consensus_controller),
Box::new(protocol_controller),
Instant::now().checked_add(timeout).unwrap(),
)
}
})
.unwrap();

let client_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
std::thread::sleep(read_timeout.checked_add(Duration::from_millis(100)).unwrap());
client_clone
.write_all(b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
.unwrap()
}
})
.unwrap();

let res = server_thread.join().unwrap();
match res {
Err(BootstrapError::TimedOut(_)) => (),
_ => panic!("The server should have been interrupted"),
}
client_thread.join().unwrap();
}

#[test]
fn test_staying_connected_pass_handshake_but_deadline_after() {
let read_timeout = Duration::from_millis(1000);
let (mut bootstrap_config, server_keypair): (BootstrapConfig, KeyPair) = BOOTSTRAP_CONFIG_KEYPAIR.clone();
bootstrap_config.read_timeout = MassaTime::try_from(read_timeout).unwrap();
let server = std::net::TcpListener::bind("localhost:0").unwrap();
let addr = server.local_addr().unwrap();
let client = std::net::TcpStream::connect(addr).unwrap();
let mut client_clone = client.try_clone().unwrap();
let server = server.accept().unwrap();
let version = || Version::from_str("TEST.1.10").unwrap();
let timeout: Duration = Duration::from_millis(10);
let consensus_controller = MockConsensusController::new();
let protocol_controller = MockProtocolController::new();
let selector_controller = MockSelectorControllerWrapper::new();

let mut server = BootstrapServerBinder::new(
server.0,
server_keypair.clone(),
BootstrapSrvBindCfg {
rate_limit: u64::MAX,
thread_count: THREAD_COUNT,
max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH,
randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES,
consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE,
write_error_timeout: MassaTime::from_millis(1000),
},
None,
);
let mut client = BootstrapClientBinder::test_default(
client,
bootstrap_config.bootstrap_list[0].1.get_public_key(),
);
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(RwLock::new(Box::new(MassaDB::new(MassaDBConfig {
path: temp_dir.path().to_path_buf(),
max_history_length: 10,
max_final_state_elements_size: 100_000_000,
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
let final_state_config = FinalStateConfig::default();
let server_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
manage_bootstrap(
&bootstrap_config,
&mut server,
Arc::new(RwLock::new(get_random_final_state_bootstrap(
PoSFinalState::new(
final_state_config.pos_config.clone(),
"",
&rolls_path,
Box::new(selector_controller.clone()),
db.clone(),
)
.unwrap(),
final_state_config,
db,
))),
version(),
Box::new(consensus_controller),
Box::new(protocol_controller),
Instant::now().checked_add(timeout).unwrap(),
)
}
})
.unwrap();

let client_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
client.handshake(version()).unwrap();
std::thread::sleep(timeout.checked_add(Duration::from_millis(100)).unwrap());
client_clone
.write_all(b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
.unwrap()
}
})
.unwrap();

let res = server_thread.join().unwrap();
match res {
Err(BootstrapError::Interupted(_)) => (),
_ => panic!("The server should have been interrupted"),
}
client_thread.join().unwrap();
}

#[test]
fn test_staying_connected_pass_handshake_but_deadline_during_data_exchange() {
let read_timeout = Duration::from_millis(500);
let (mut bootstrap_config, server_keypair): (BootstrapConfig, KeyPair) = BOOTSTRAP_CONFIG_KEYPAIR.clone();
bootstrap_config.read_timeout = MassaTime::try_from(read_timeout).unwrap();
let server = std::net::TcpListener::bind("localhost:0").unwrap();
let addr = server.local_addr().unwrap();
let client = std::net::TcpStream::connect(addr).unwrap();
let server = server.accept().unwrap();
let version = || Version::from_str("TEST.1.10").unwrap();
let timeout: Duration = Duration::from_millis(800);
let consensus_controller = MockConsensusController::new();
let mut protocol_controller = MockProtocolController::new();
protocol_controller.expect_get_bootstrap_peers().returning(|| {
Ok(BootstrapPeers(vec![]))
});
let selector_controller = MockSelectorControllerWrapper::new();

let mut server = BootstrapServerBinder::new(
server.0,
server_keypair.clone(),
BootstrapSrvBindCfg {
rate_limit: u64::MAX,
thread_count: THREAD_COUNT,
max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH,
randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES,
consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE,
write_error_timeout: MassaTime::from_millis(1000),
},
None,
);
let mut client = BootstrapClientBinder::test_default(
client,
bootstrap_config.bootstrap_list[0].1.get_public_key(),
);
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(RwLock::new(Box::new(MassaDB::new(MassaDBConfig {
path: temp_dir.path().to_path_buf(),
max_history_length: 10,
max_final_state_elements_size: 100_000_000,
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
let final_state_config = FinalStateConfig::default();
let server_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
manage_bootstrap(
&bootstrap_config,
&mut server,
Arc::new(RwLock::new(get_random_final_state_bootstrap(
PoSFinalState::new(
final_state_config.pos_config.clone(),
"",
&rolls_path,
Box::new(selector_controller.clone()),
db.clone(),
)
.unwrap(),
final_state_config,
db,
))),
version(),
Box::new(consensus_controller),
Box::new(protocol_controller),
Instant::now().checked_add(timeout).unwrap(),
)
}
})
.unwrap();

let client_thread = std::thread::Builder::new()
.name("test_binders::server_thread".to_string())
.spawn({
move || {
client.handshake(version()).unwrap();
// Pass the bootstrap time sent by server
client.next_timeout(Some(read_timeout)).unwrap();
for _ in 0..10 {
let _ = client.send_timeout(&BootstrapClientMessage::AskBootstrapPeers, Some(Duration::from_millis(1000)));
let _ = client.next_timeout(Some(read_timeout));
std::thread::sleep(timeout.div_f32(5.0));
}
}
})
.unwrap();

let res = server_thread.join().unwrap();
// if someone doesn't send anything in a data exchange phase the server consider this exchange as finished and so finish without problems
match res {
Ok(()) => (),
_ => panic!("The server should stop without problems"),
}
client_thread.join().unwrap();
}


// serial test for time-taken sensitive tests: reduces parallelism noise
#[test]
#[serial]
Expand Down
8 changes: 2 additions & 6 deletions massa-bootstrap/src/tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ fn test_serialize_bootstrap_server_message() {
parametric_test(
Duration::from_secs(30),
config,
vec![
4418156015316874292,
],
vec![4418156015316874292],
|config, rng| {
let msg = BootstrapServerMessage::generate(rng);
let mut bytes = Vec::new();
Expand Down Expand Up @@ -82,9 +80,7 @@ fn test_serialize_bootstrap_client_message() {
parametric_test(
Duration::from_secs(30),
(),
vec![
12770601678208144436,
],
vec![12770601678208144436],
|_, rng| {
let msg = BootstrapClientMessage::generate(rng);
let mut bytes = Vec::new();
Expand Down
10 changes: 6 additions & 4 deletions massa-bootstrap/src/tests/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,9 +770,11 @@ impl BootstrapServerMessage {
}
2 => {
let slot = gen_random_slot(rng);
let state_part = gen_random_stream_batch(MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE, slot, rng);
let state_part =
gen_random_stream_batch(MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE, slot, rng);
let slot = gen_random_slot(rng);
let versioning_part = gen_random_stream_batch(MAX_BOOTSTRAP_VERSIONING_ELEMENTS_SIZE, slot, rng);
let versioning_part =
gen_random_stream_batch(MAX_BOOTSTRAP_VERSIONING_ELEMENTS_SIZE, slot, rng);
let mut final_blocks = vec![];
let block_nb = rng.gen_range(5..100); //MAX_BOOTSTRAP_BLOCKS);
for _ in 0..block_nb {
Expand Down Expand Up @@ -1223,7 +1225,7 @@ impl BootstrapClientMessage {
} else {
StreamingStep::Started
};

let last_consensus_step = if last_slot.is_some() {
let nb = rng.gen_range(0..100);
let mut data = PreHashSet::with_capacity(nb);
Expand All @@ -1234,7 +1236,7 @@ impl BootstrapClientMessage {
} else {
StreamingStep::Started
};

let send_last_start_period = if last_slot.is_none() {
true
} else {
Expand Down