Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mkt tst] adapt test cyclic broadcasts for uncertainty #1237

Merged
merged 7 commits into from
Apr 16, 2021
1 change: 1 addition & 0 deletions core/market/src/testing/mock_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl MarketsNetwork {
/// It will be used to create directories and GSB binding points,
/// to avoid potential name clashes.
pub async fn new(test_name: Option<&str>) -> Self {
std::env::set_var("GSB_URL", "wrong url");
let _ = env_logger::builder().try_init();
// level 1 is this function.
// level 2 is <core::future::from_generator::GenFuture<T> as
Expand Down
81 changes: 39 additions & 42 deletions core/market/tests/test_cyclic_broadcasts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn test_startup_offers_sharing() {
// Change expected time of sending broadcasts.
let mut config = Config::default();
config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100);
config.discovery.max_bcasted_offers = 50;
config.discovery.max_bcasted_offers = 7;

let network = MarketsNetwork::new(None)
.await
Expand All @@ -40,7 +40,7 @@ async fn test_startup_offers_sharing() {
// are sent immediately, after subscription is made.
let mut subscriptions = vec![];

for _n in (0u8..30).into_iter() {
for _n in 0..3 {
subscriptions.push(
market1
.subscribe_offer(&client::sample_offer(), &id1)
Expand All @@ -49,7 +49,7 @@ async fn test_startup_offers_sharing() {
)
}

for _n in (0..20).into_iter() {
for _n in 0..2 {
subscriptions.push(
market2
.subscribe_offer(&client::sample_offer(), &id2)
Expand All @@ -60,9 +60,8 @@ async fn test_startup_offers_sharing() {

let network = network.add_market_instance("Node-3").await;

// We expect, that after this time we, should get at least one broadcast
// from each Node.
tokio::time::delay_for(Duration::from_millis(800)).await;
// After 300ms we should get at least two broadcasts from each Node.
tokio::time::delay_for(Duration::from_millis(320)).await;

let market3 = network.get_market("Node-3");

Expand All @@ -87,8 +86,8 @@ async fn test_unsubscribes_cyclic_broadcasts() {
let mut config = Config::default();
config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100);
config.discovery.mean_cyclic_unsubscribes_interval = Duration::from_millis(100);
config.discovery.max_bcasted_offers = 50;
config.discovery.max_bcasted_unsubscribes = 50;
config.discovery.max_bcasted_offers = 7;
config.discovery.max_bcasted_unsubscribes = 7;

let network = MarketsNetwork::new(None)
.await
Expand All @@ -111,7 +110,7 @@ async fn test_unsubscribes_cyclic_broadcasts() {
let mut subscriptions1 = vec![];
let mut subscriptions2 = vec![];

for _n in (0..30).into_iter() {
for _n in 0..3 {
subscriptions1.push(
market1
.subscribe_offer(&client::sample_offer(), &id1)
Expand All @@ -120,7 +119,7 @@ async fn test_unsubscribes_cyclic_broadcasts() {
)
}

for _n in (0..20).into_iter() {
for _n in 0..2 {
subscriptions2.push(
market2
.subscribe_offer(&client::sample_offer(), &id2)
Expand All @@ -129,9 +128,9 @@ async fn test_unsubscribes_cyclic_broadcasts() {
)
}

// We expect, that after this time all nodes will have the same
// knowledge about Offers.
tokio::time::delay_for(Duration::from_millis(600)).await;
// After 300ms we should get at least two broadcasts from each Node, and so
// all nodes will have the same knowledge about Offers.
tokio::time::delay_for(Duration::from_millis(320)).await;
for subscription in subscriptions1.iter().chain(subscriptions2.iter()) {
market1.get_offer(&subscription).await.unwrap();
market2.get_offer(&subscription).await.unwrap();
Expand All @@ -142,42 +141,39 @@ async fn test_unsubscribes_cyclic_broadcasts() {
network.break_networking_for("Node-3").unwrap();

// Unsubscribe random Offers.
// First 10 elements of vectors will NOT be unsubscribed.
// Only the first elements of the vectors will NOT be unsubscribed.
subscriptions1.shuffle(&mut rand::thread_rng());
subscriptions2.shuffle(&mut rand::thread_rng());

for subscription in subscriptions1[10..].iter() {
for subscription in subscriptions1[1..].iter() {
market1.unsubscribe_offer(subscription, &id1).await.unwrap();
}
for subscription in subscriptions2[10..].iter() {
for subscription in subscriptions2[1..].iter() {
market2.unsubscribe_offer(subscription, &id2).await.unwrap();
}

// Sanity check. We should have all Offers subscribe at this moment,
// since our network didn't work.
// Sanity check. We should have all Offers subscribed at this moment,
// since Node-3 network didn't work.
for subscription in subscriptions1.iter().chain(subscriptions2.iter()) {
market3.get_offer(subscription).await.unwrap();
}

// Reenable networking for Node-3. We should get only cyclic broadcast.
// Re-enable networking for Node-3. We should get only cyclic broadcast.
// Immediate broadcast should be already lost.
tokio::time::delay_for(Duration::from_millis(100)).await;
network.enable_networking_for("Node-3").unwrap();
tokio::time::delay_for(Duration::from_millis(800)).await;
tokio::time::delay_for(Duration::from_millis(320)).await;

// Check if all expected Offers were unsubscribed.
for subscription in subscriptions1[10..]
.iter()
.chain(subscriptions2[10..].iter())
{
for subscription in subscriptions1[1..].iter().chain(subscriptions2[1..].iter()) {
let expected_error = QueryOfferError::Unsubscribed(subscription.clone());
assert_err_eq!(expected_error, market3.get_offer(&subscription).await)
}

// Check Offers, that shouldn't be unsubscribed.
for subscription in subscriptions1[0..10]
for subscription in subscriptions1[0..1]
.iter()
.chain(subscriptions2[0..10].iter())
.chain(subscriptions2[0..1].iter())
{
market3.get_offer(&subscription).await.unwrap();
}
Expand Down Expand Up @@ -231,7 +227,7 @@ async fn test_sharing_someones_else_offers() {
// Change expected time of sending broadcasts.
let mut config = Config::default();
config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100);
config.discovery.max_bcasted_offers = 50;
config.discovery.max_bcasted_offers = 7;

let network = MarketsNetwork::new(None)
.await
Expand All @@ -252,7 +248,7 @@ async fn test_sharing_someones_else_offers() {
// are sent immediately, after subscription is made.
let mut subscriptions = vec![];

for _n in (0u8..30).into_iter() {
for _n in 0..3 {
subscriptions.push(
market1
.subscribe_offer(&client::sample_offer(), &id1)
Expand All @@ -261,7 +257,7 @@ async fn test_sharing_someones_else_offers() {
)
}

for _n in (0..20).into_iter() {
for _n in 0..2 {
subscriptions.push(
market2
.subscribe_offer(&client::sample_offer(), &id2)
Expand All @@ -271,7 +267,7 @@ async fn test_sharing_someones_else_offers() {
}

// Wait until Node-1 and Node-2 will share their Offers.
tokio::time::delay_for(Duration::from_millis(800)).await;
tokio::time::delay_for(Duration::from_millis(200)).await;

// Sanity check. Node-2 should have all Offers; also from Node-1.
for subscription in subscriptions.iter() {
Expand All @@ -284,16 +280,16 @@ async fn test_sharing_someones_else_offers() {
let network = network.add_market_instance("Node-3").await;
let market3 = network.get_market("Node-3");

// We expect, that after this time we, should get at least one broadcast.
tokio::time::delay_for(Duration::from_millis(400)).await;
// After 300ms we should get at least two broadcasts.
tokio::time::delay_for(Duration::from_millis(320)).await;

// Make sure Node-3 has all offers from both: Node-1 and Node-2.
for subscription in subscriptions.into_iter() {
market3.get_offer(&subscription).await.unwrap();
}
}

/// Nodes send in cyclic broadcasts not only own Offers unsubscribes, but Offers
/// Nodes send in cyclic broadcasts not only own Offers unsubscribes, but unsubscribes
/// from other Nodes either.
#[cfg_attr(not(feature = "test-suite"), ignore)]
#[serial_test::serial]
Expand All @@ -304,8 +300,8 @@ async fn test_sharing_someones_else_unsubscribes() {
let mut config = Config::default();
config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100);
config.discovery.mean_cyclic_unsubscribes_interval = Duration::from_millis(100);
config.discovery.max_bcasted_offers = 50;
config.discovery.max_bcasted_unsubscribes = 50;
config.discovery.max_bcasted_offers = 7;
config.discovery.max_bcasted_unsubscribes = 7;

let network = MarketsNetwork::new(None)
.await
Expand All @@ -327,7 +323,7 @@ async fn test_sharing_someones_else_unsubscribes() {

let mut subscriptions = vec![];

for _n in (0..30).into_iter() {
for _n in 0..3 {
subscriptions.push(
market1
.subscribe_offer(&client::sample_offer(), &id1)
Expand All @@ -336,7 +332,7 @@ async fn test_sharing_someones_else_unsubscribes() {
)
}

for _n in (0..20).into_iter() {
for _n in 0..2 {
subscriptions.push(
market2
.subscribe_offer(&client::sample_offer(), &id2)
Expand All @@ -346,7 +342,8 @@ async fn test_sharing_someones_else_unsubscribes() {
}

// Wait until Nodes will share their Offers.
tokio::time::delay_for(Duration::from_millis(200)).await;
// After 300ms we should get at least two broadcasts from each Node.
tokio::time::delay_for(Duration::from_millis(320)).await;

// Sanity check. Node-3 should have all Offers from market1.
for subscription in subscriptions.iter() {
Expand All @@ -356,7 +353,7 @@ async fn test_sharing_someones_else_unsubscribes() {
// Break networking for Node-3, so he won't see any unsubscribes.
network.break_networking_for("Node-3").unwrap();

for subscription in subscriptions[30..].iter() {
for subscription in subscriptions[3..].iter() {
market2.unsubscribe_offer(subscription, &id2).await.unwrap();
}

Expand All @@ -367,16 +364,16 @@ async fn test_sharing_someones_else_unsubscribes() {
network.enable_networking_for("Node-3").unwrap();

// We expect that all unsubscribed will be shared with Node-3 after this delay.
tokio::time::delay_for(Duration::from_millis(1200)).await;
for subscription in subscriptions[30..].into_iter() {
tokio::time::delay_for(Duration::from_millis(320)).await;
for subscription in subscriptions[3..].into_iter() {
let expected_error = QueryOfferError::Unsubscribed(subscription.clone());
assert_err_eq!(expected_error, market1.get_offer(&subscription).await);
assert_err_eq!(expected_error, market2.get_offer(&subscription).await);
assert_err_eq!(expected_error, market3.get_offer(&subscription).await);
}

// All other Offers should remain untouched.
for subscription in subscriptions[0..30].into_iter() {
for subscription in subscriptions[0..3].into_iter() {
market1.get_offer(&subscription).await.unwrap();
market2.get_offer(&subscription).await.unwrap();
market3.get_offer(&subscription).await.unwrap();
Expand Down