From 77dcd21bd1215f015a3afb4ed2aa1818dd420031 Mon Sep 17 00:00:00 2001 From: Piotr Chromiec Date: Thu, 15 Apr 2021 22:35:34 +0200 Subject: [PATCH 1/5] [mkt tst] reduce number of Offers to fix test cyclic broadcasts --- core/market/tests/test_cyclic_broadcasts.rs | 78 ++++++++++----------- 1 file changed, 37 insertions(+), 41 deletions(-) diff --git a/core/market/tests/test_cyclic_broadcasts.rs b/core/market/tests/test_cyclic_broadcasts.rs index d98c277bca..8c213d35a2 100644 --- a/core/market/tests/test_cyclic_broadcasts.rs +++ b/core/market/tests/test_cyclic_broadcasts.rs @@ -19,7 +19,7 @@ async fn test_startup_offers_sharing() { // Change expected time of sending broadcasts. let mut config = Config::default(); config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100); - config.discovery.max_bcasted_offers = 50; + config.discovery.max_bcasted_offers = 7; let network = MarketsNetwork::new(None) .await @@ -40,7 +40,7 @@ async fn test_startup_offers_sharing() { // are sent immediately, after subscription is made. let mut subscriptions = vec![]; - for _n in (0u8..30).into_iter() { + for _n in 0..3 { subscriptions.push( market1 .subscribe_offer(&client::sample_offer(), &id1) @@ -49,7 +49,7 @@ async fn test_startup_offers_sharing() { ) } - for _n in (0..20).into_iter() { + for _n in 0..2 { subscriptions.push( market2 .subscribe_offer(&client::sample_offer(), &id2) @@ -60,9 +60,8 @@ async fn test_startup_offers_sharing() { let network = network.add_market_instance("Node-3").await; - // We expect, that after this time we, should get at least one broadcast - // from each Node. - tokio::time::delay_for(Duration::from_millis(800)).await; + // After 300ms we should get at least two broadcasts from each Node. + tokio::time::delay_for(Duration::from_millis(320)).await; let market3 = network.get_market("Node-3"); @@ -87,8 +86,8 @@ async fn test_unsubscribes_cyclic_broadcasts() { let mut config = Config::default(); config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100); config.discovery.mean_cyclic_unsubscribes_interval = Duration::from_millis(100); - config.discovery.max_bcasted_offers = 50; - config.discovery.max_bcasted_unsubscribes = 50; + config.discovery.max_bcasted_offers = 7; + config.discovery.max_bcasted_unsubscribes = 7; let network = MarketsNetwork::new(None) .await @@ -111,7 +110,7 @@ async fn test_unsubscribes_cyclic_broadcasts() { let mut subscriptions1 = vec![]; let mut subscriptions2 = vec![]; - for _n in (0..30).into_iter() { + for _n in 0..3 { subscriptions1.push( market1 .subscribe_offer(&client::sample_offer(), &id1) @@ -120,7 +119,7 @@ async fn test_unsubscribes_cyclic_broadcasts() { ) } - for _n in (0..20).into_iter() { + for _n in 0..2 { subscriptions2.push( market2 .subscribe_offer(&client::sample_offer(), &id2) @@ -129,9 +128,9 @@ async fn test_unsubscribes_cyclic_broadcasts() { ) } - // We expect, that after this time all nodes will have the same - // knowledge about Offers. - tokio::time::delay_for(Duration::from_millis(600)).await; + // After 300ms we should get at least two broadcasts from each Node, and so + // all nodes will have the same knowledge about Offers. + tokio::time::delay_for(Duration::from_millis(320)).await; for subscription in subscriptions1.iter().chain(subscriptions2.iter()) { market1.get_offer(&subscription).await.unwrap(); market2.get_offer(&subscription).await.unwrap(); @@ -142,42 +141,39 @@ async fn test_unsubscribes_cyclic_broadcasts() { network.break_networking_for("Node-3").unwrap(); // Unsubscribe random Offers. - // First 10 elements of vectors will NOT be unsubscribed. + // Only the first elements of the vectors will NOT be unsubscribed. subscriptions1.shuffle(&mut rand::thread_rng()); subscriptions2.shuffle(&mut rand::thread_rng()); - for subscription in subscriptions1[10..].iter() { + for subscription in subscriptions1[1..].iter() { market1.unsubscribe_offer(subscription, &id1).await.unwrap(); } - for subscription in subscriptions2[10..].iter() { + for subscription in subscriptions2[1..].iter() { market2.unsubscribe_offer(subscription, &id2).await.unwrap(); } - // Sanity check. We should have all Offers subscribe at this moment, - // since our network didn't work. + // Sanity check. We should have all Offers subscribed at this moment, + // since Node-3 network didn't work. for subscription in subscriptions1.iter().chain(subscriptions2.iter()) { market3.get_offer(subscription).await.unwrap(); } - // Reenable networking for Node-3. We should get only cyclic broadcast. + // Re-enable networking for Node-3. We should get only cyclic broadcast. // Immediate broadcast should be already lost. tokio::time::delay_for(Duration::from_millis(100)).await; network.enable_networking_for("Node-3").unwrap(); - tokio::time::delay_for(Duration::from_millis(800)).await; + tokio::time::delay_for(Duration::from_millis(320)).await; // Check if all expected Offers were unsubscribed. - for subscription in subscriptions1[10..] - .iter() - .chain(subscriptions2[10..].iter()) - { + for subscription in subscriptions1[1..].iter().chain(subscriptions2[1..].iter()) { let expected_error = QueryOfferError::Unsubscribed(subscription.clone()); assert_err_eq!(expected_error, market3.get_offer(&subscription).await) } // Check Offers, that shouldn't be unsubscribed. - for subscription in subscriptions1[0..10] + for subscription in subscriptions1[0..1] .iter() - .chain(subscriptions2[0..10].iter()) + .chain(subscriptions2[0..1].iter()) { market3.get_offer(&subscription).await.unwrap(); } @@ -231,7 +227,7 @@ async fn test_sharing_someones_else_offers() { // Change expected time of sending broadcasts. let mut config = Config::default(); config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100); - config.discovery.max_bcasted_offers = 50; + config.discovery.max_bcasted_offers = 7; let network = MarketsNetwork::new(None) .await @@ -252,7 +248,7 @@ async fn test_sharing_someones_else_offers() { // are sent immediately, after subscription is made. let mut subscriptions = vec![]; - for _n in (0u8..30).into_iter() { + for _n in 0..3 { subscriptions.push( market1 .subscribe_offer(&client::sample_offer(), &id1) @@ -261,7 +257,7 @@ async fn test_sharing_someones_else_offers() { ) } - for _n in (0..20).into_iter() { + for _n in 0..2 { subscriptions.push( market2 .subscribe_offer(&client::sample_offer(), &id2) @@ -271,7 +267,7 @@ async fn test_sharing_someones_else_offers() { } // Wait until Node-1 and Node-2 will share their Offers. - tokio::time::delay_for(Duration::from_millis(800)).await; + tokio::time::delay_for(Duration::from_millis(200)).await; // Sanity check. Node-2 should have all Offers; also from Node-1. for subscription in subscriptions.iter() { @@ -284,8 +280,8 @@ async fn test_sharing_someones_else_offers() { let network = network.add_market_instance("Node-3").await; let market3 = network.get_market("Node-3"); - // We expect, that after this time we, should get at least one broadcast. - tokio::time::delay_for(Duration::from_millis(400)).await; + // After 300ms we should get at least two broadcasts. + tokio::time::delay_for(Duration::from_millis(320)).await; // Make sure Node-3 has all offers from both: Node-1 and Node-2. for subscription in subscriptions.into_iter() { @@ -293,7 +289,7 @@ async fn test_sharing_someones_else_offers() { } } -/// Nodes send in cyclic broadcasts not only own Offers unsubscribes, but Offers +/// Nodes send in cyclic broadcasts not only own Offers unsubscribes, but unsubscribes /// from other Nodes either. #[cfg_attr(not(feature = "test-suite"), ignore)] #[serial_test::serial] @@ -304,8 +300,8 @@ async fn test_sharing_someones_else_unsubscribes() { let mut config = Config::default(); config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100); config.discovery.mean_cyclic_unsubscribes_interval = Duration::from_millis(100); - config.discovery.max_bcasted_offers = 50; - config.discovery.max_bcasted_unsubscribes = 50; + config.discovery.max_bcasted_offers = 7; + config.discovery.max_bcasted_unsubscribes = 7; let network = MarketsNetwork::new(None) .await @@ -327,7 +323,7 @@ async fn test_sharing_someones_else_unsubscribes() { let mut subscriptions = vec![]; - for _n in (0..30).into_iter() { + for _n in 0..3 { subscriptions.push( market1 .subscribe_offer(&client::sample_offer(), &id1) @@ -336,7 +332,7 @@ async fn test_sharing_someones_else_unsubscribes() { ) } - for _n in (0..20).into_iter() { + for _n in 0..2 { subscriptions.push( market2 .subscribe_offer(&client::sample_offer(), &id2) @@ -356,7 +352,7 @@ async fn test_sharing_someones_else_unsubscribes() { // Break networking for Node-3, so he won't see any unsubscribes. network.break_networking_for("Node-3").unwrap(); - for subscription in subscriptions[30..].iter() { + for subscription in subscriptions[3..].iter() { market2.unsubscribe_offer(subscription, &id2).await.unwrap(); } @@ -367,8 +363,8 @@ async fn test_sharing_someones_else_unsubscribes() { network.enable_networking_for("Node-3").unwrap(); // We expect that all unsubscribed will be shared with Node-3 after this delay. - tokio::time::delay_for(Duration::from_millis(1200)).await; - for subscription in subscriptions[30..].into_iter() { + tokio::time::delay_for(Duration::from_millis(320)).await; + for subscription in subscriptions[3..].into_iter() { let expected_error = QueryOfferError::Unsubscribed(subscription.clone()); assert_err_eq!(expected_error, market1.get_offer(&subscription).await); assert_err_eq!(expected_error, market2.get_offer(&subscription).await); @@ -376,7 +372,7 @@ async fn test_sharing_someones_else_unsubscribes() { } // All other Offers should remain untouched. - for subscription in subscriptions[0..30].into_iter() { + for subscription in subscriptions[0..3].into_iter() { market1.get_offer(&subscription).await.unwrap(); market2.get_offer(&subscription).await.unwrap(); market3.get_offer(&subscription).await.unwrap(); From 1a1239a3982a720b1a471dd093307e4922519bea Mon Sep 17 00:00:00 2001 From: Piotr Chromiec Date: Fri, 16 Apr 2021 00:43:36 +0200 Subject: [PATCH 2/5] [mkt test] DIRTY HACK: use wrong GSB_URL value to avoid remote router usage and reconnections --- core/market/src/testing/mock_node.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/market/src/testing/mock_node.rs b/core/market/src/testing/mock_node.rs index 5213bec522..44f38d512a 100644 --- a/core/market/src/testing/mock_node.rs +++ b/core/market/src/testing/mock_node.rs @@ -110,6 +110,7 @@ impl MarketsNetwork { /// It will be used to create directories and GSB binding points, /// to avoid potential name clashes. pub async fn new(test_name: Option<&str>) -> Self { + std::env::set_var("GSB_URL", "wrong url"); let _ = env_logger::builder().try_init(); // level 1 is this function. // level 2 is as From 465610398303de7ff663b3200f908765faae4b93 Mon Sep 17 00:00:00 2001 From: Piotr Chromiec Date: Fri, 16 Apr 2021 10:27:43 +0200 Subject: [PATCH 3/5] [mkt tst] increase delay for broadcasts propagation --- core/market/tests/test_cyclic_broadcasts.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/market/tests/test_cyclic_broadcasts.rs b/core/market/tests/test_cyclic_broadcasts.rs index 8c213d35a2..3687ffe798 100644 --- a/core/market/tests/test_cyclic_broadcasts.rs +++ b/core/market/tests/test_cyclic_broadcasts.rs @@ -342,7 +342,8 @@ async fn test_sharing_someones_else_unsubscribes() { } // Wait until Nodes will share their Offers. - tokio::time::delay_for(Duration::from_millis(200)).await; + // After 300ms we should get at least two broadcasts from each Node. + tokio::time::delay_for(Duration::from_millis(320)).await; // Sanity check. Node-3 should have all Offers from market1. for subscription in subscriptions.iter() { From 8a84922ed51770175a580c5063453e2b39d8ace6 Mon Sep 17 00:00:00 2001 From: Piotr Chromiec Date: Fri, 16 Apr 2021 14:34:39 +0200 Subject: [PATCH 4/5] [mkt tst] comment for DIRTY HACK --- core/market/src/testing/mock_node.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/market/src/testing/mock_node.rs b/core/market/src/testing/mock_node.rs index 44f38d512a..af58b7de7e 100644 --- a/core/market/src/testing/mock_node.rs +++ b/core/market/src/testing/mock_node.rs @@ -110,6 +110,10 @@ impl MarketsNetwork { /// It will be used to create directories and GSB binding points, /// to avoid potential name clashes. pub async fn new(test_name: Option<&str>) -> Self { + // DIRTY HACK. We need it because GSB v0.4.4 causes market test suite to hang + // using 100% CPU trying to reconnect in the infinite loop. + // It can be removed when https://github.com/golemfactory/ya-service-bus/issues/22 + // is addressed std::env::set_var("GSB_URL", "wrong url"); let _ = env_logger::builder().try_init(); // level 1 is this function. From 7b162283ffd6adbde596e796d0d4dc45e2872cbc Mon Sep 17 00:00:00 2001 From: Piotr Chromiec Date: Fri, 16 Apr 2021 18:09:40 +0200 Subject: [PATCH 5/5] [mkt tst] next level of bcast testing ! --- core/market/tests/test_cyclic_broadcasts.rs | 238 +++++++++++--------- 1 file changed, 132 insertions(+), 106 deletions(-) diff --git a/core/market/tests/test_cyclic_broadcasts.rs b/core/market/tests/test_cyclic_broadcasts.rs index 3687ffe798..524c7b2293 100644 --- a/core/market/tests/test_cyclic_broadcasts.rs +++ b/core/market/tests/test_cyclic_broadcasts.rs @@ -3,10 +3,10 @@ use std::sync::Arc; use std::time::Duration; use ya_market::assert_err_eq; -use ya_market::testing::mock_offer::client; -use ya_market::testing::Config; -use ya_market::testing::QueryOfferError; -use ya_market::testing::{MarketServiceExt, MarketsNetwork}; +use ya_market::testing::{ + mock_offer::client, Config, MarketServiceExt, MarketsNetwork, QueryOfferError, SubscriptionId, +}; +use ya_market::MarketService; /// Initialize two markets and add Offers. /// Third market that will be instantiated after these two, should @@ -29,10 +29,10 @@ async fn test_startup_offers_sharing() { .add_market_instance("Node-2") .await; - let market1 = network.get_market("Node-1"); + let mkt1 = network.get_market("Node-1"); let id1 = network.get_default_id("Node-1"); - let market2 = network.get_market("Node-2"); + let mkt2 = network.get_market("Node-2"); let id2 = network.get_default_id("Node-2"); // Create some offers before we instantiate 3rd node. @@ -42,8 +42,7 @@ async fn test_startup_offers_sharing() { for _n in 0..3 { subscriptions.push( - market1 - .subscribe_offer(&client::sample_offer(), &id1) + mkt1.subscribe_offer(&client::sample_offer(), &id1) .await .unwrap(), ) @@ -51,8 +50,7 @@ async fn test_startup_offers_sharing() { for _n in 0..2 { subscriptions.push( - market2 - .subscribe_offer(&client::sample_offer(), &id2) + mkt2.subscribe_offer(&client::sample_offer(), &id2) .await .unwrap(), ) @@ -60,15 +58,10 @@ async fn test_startup_offers_sharing() { let network = network.add_market_instance("Node-3").await; - // After 300ms we should get at least two broadcasts from each Node. - tokio::time::delay_for(Duration::from_millis(320)).await; - - let market3 = network.get_market("Node-3"); + let mkt3 = network.get_market("Node-3"); // Make sure we got all offers that, were created. - for subscription in subscriptions.into_iter() { - market3.get_offer(&subscription).await.unwrap(); - } + assert_offers_broadcasted(&[&mkt1, &mkt2, &mkt3], subscriptions.iter()).await; } /// Unsubscribes are sent immediately after Offer is unsubscribed and @@ -86,8 +79,8 @@ async fn test_unsubscribes_cyclic_broadcasts() { let mut config = Config::default(); config.discovery.mean_cyclic_bcast_interval = Duration::from_millis(100); config.discovery.mean_cyclic_unsubscribes_interval = Duration::from_millis(100); - config.discovery.max_bcasted_offers = 7; - config.discovery.max_bcasted_unsubscribes = 7; + config.discovery.max_bcasted_offers = 70; + config.discovery.max_bcasted_unsubscribes = 70; let network = MarketsNetwork::new(None) .await @@ -99,43 +92,38 @@ async fn test_unsubscribes_cyclic_broadcasts() { .add_market_instance("Node-3") .await; - let market1 = network.get_market("Node-1"); + let mkt1 = network.get_market("Node-1"); let id1 = network.get_default_id("Node-1"); - let market2 = network.get_market("Node-2"); + let mkt2 = network.get_market("Node-2"); let id2 = network.get_default_id("Node-2"); - let market3 = network.get_market("Node-3"); + let mkt3 = network.get_market("Node-3"); let mut subscriptions1 = vec![]; let mut subscriptions2 = vec![]; - for _n in 0..3 { + for _n in 0..30 { subscriptions1.push( - market1 - .subscribe_offer(&client::sample_offer(), &id1) + mkt1.subscribe_offer(&client::sample_offer(), &id1) .await .unwrap(), ) } - for _n in 0..2 { + for _n in 0..20 { subscriptions2.push( - market2 - .subscribe_offer(&client::sample_offer(), &id2) + mkt2.subscribe_offer(&client::sample_offer(), &id2) .await .unwrap(), ) } - // After 300ms we should get at least two broadcasts from each Node, and so - // all nodes will have the same knowledge about Offers. - tokio::time::delay_for(Duration::from_millis(320)).await; - for subscription in subscriptions1.iter().chain(subscriptions2.iter()) { - market1.get_offer(&subscription).await.unwrap(); - market2.get_offer(&subscription).await.unwrap(); - market3.get_offer(&subscription).await.unwrap(); - } + assert_offers_broadcasted( + &[&mkt1, &mkt2, &mkt3], + subscriptions1.iter().chain(subscriptions2.iter()), + ) + .await; // Break networking, so unsubscribe broadcasts won't come to Node-3. network.break_networking_for("Node-3").unwrap(); @@ -145,38 +133,39 @@ async fn test_unsubscribes_cyclic_broadcasts() { subscriptions1.shuffle(&mut rand::thread_rng()); subscriptions2.shuffle(&mut rand::thread_rng()); - for subscription in subscriptions1[1..].iter() { - market1.unsubscribe_offer(subscription, &id1).await.unwrap(); + for subscription in subscriptions1[10..].iter() { + mkt1.unsubscribe_offer(subscription, &id1).await.unwrap(); } - for subscription in subscriptions2[1..].iter() { - market2.unsubscribe_offer(subscription, &id2).await.unwrap(); + for subscription in subscriptions2[10..].iter() { + mkt2.unsubscribe_offer(subscription, &id2).await.unwrap(); } // Sanity check. We should have all Offers subscribed at this moment, // since Node-3 network didn't work. - for subscription in subscriptions1.iter().chain(subscriptions2.iter()) { - market3.get_offer(subscription).await.unwrap(); - } + assert_offers_broadcasted(&[&mkt3], subscriptions1.iter().chain(subscriptions2.iter())).await; // Re-enable networking for Node-3. We should get only cyclic broadcast. // Immediate broadcast should be already lost. - tokio::time::delay_for(Duration::from_millis(100)).await; network.enable_networking_for("Node-3").unwrap(); tokio::time::delay_for(Duration::from_millis(320)).await; // Check if all expected Offers were unsubscribed. - for subscription in subscriptions1[1..].iter().chain(subscriptions2[1..].iter()) { - let expected_error = QueryOfferError::Unsubscribed(subscription.clone()); - assert_err_eq!(expected_error, market3.get_offer(&subscription).await) - } + assert_unsunbscribes_broadcasted( + &[&mkt1, &mkt2, &mkt3], + subscriptions1[10..] + .iter() + .chain(subscriptions2[10..].iter()), + ) + .await; // Check Offers, that shouldn't be unsubscribed. - for subscription in subscriptions1[0..1] - .iter() - .chain(subscriptions2[0..1].iter()) - { - market3.get_offer(&subscription).await.unwrap(); - } + assert_offers_broadcasted( + &[&mkt1, &mkt2, &mkt3], + subscriptions1[0..10] + .iter() + .chain(subscriptions2[0..10].iter()), + ) + .await; } /// Subscribing and unsubscribing should work despite network errors. @@ -193,28 +182,27 @@ async fn test_network_error_while_subscribing() { .add_market_instance("Node-2") .await; - let market1 = network.get_market("Node-1"); + let mkt1 = network.get_market("Node-1"); let id1 = network.get_default_id("Node-1"); network.break_networking_for("Node-1").unwrap(); // It's not an error. Should pass. - let subscription_id = market1 + let subscription_id = mkt1 .subscribe_offer(&client::sample_offer(), &id1) .await .unwrap(); - market1 - .unsubscribe_offer(&subscription_id, &id1) + mkt1.unsubscribe_offer(&subscription_id, &id1) .await .unwrap(); let expected_error = QueryOfferError::Unsubscribed(subscription_id.clone()); - assert_err_eq!(expected_error, market1.get_offer(&subscription_id).await); + assert_err_eq!(expected_error, mkt1.get_offer(&subscription_id).await); let expected_error = QueryOfferError::NotFound(subscription_id.clone()); - let market2 = network.get_market("Node-2"); - assert_err_eq!(expected_error, market2.get_offer(&subscription_id).await); + let mkt2 = network.get_market("Node-2"); + assert_err_eq!(expected_error, mkt2.get_offer(&subscription_id).await); } /// Nodes send in cyclic broadcasts not only own Offers, but Offers @@ -237,10 +225,10 @@ async fn test_sharing_someones_else_offers() { .add_market_instance("Node-2") .await; - let market1 = network.get_market("Node-1"); + let mkt1 = network.get_market("Node-1"); let id1 = network.get_default_id("Node-1"); - let market2 = network.get_market("Node-2"); + let mkt2 = network.get_market("Node-2"); let id2 = network.get_default_id("Node-2"); // Create some offers before we instantiate 3rd node. @@ -250,8 +238,7 @@ async fn test_sharing_someones_else_offers() { for _n in 0..3 { subscriptions.push( - market1 - .subscribe_offer(&client::sample_offer(), &id1) + mkt1.subscribe_offer(&client::sample_offer(), &id1) .await .unwrap(), ) @@ -259,34 +246,22 @@ async fn test_sharing_someones_else_offers() { for _n in 0..2 { subscriptions.push( - market2 - .subscribe_offer(&client::sample_offer(), &id2) + mkt2.subscribe_offer(&client::sample_offer(), &id2) .await .unwrap(), ) } - // Wait until Node-1 and Node-2 will share their Offers. - tokio::time::delay_for(Duration::from_millis(200)).await; - - // Sanity check. Node-2 should have all Offers; also from Node-1. - for subscription in subscriptions.iter() { - assert!(market2.get_offer(subscription).await.is_ok()); - } + // Sanity check. Both nodes should have all Offers. + assert_offers_broadcasted(&[&mkt1, &mkt2], subscriptions.iter()).await; // Break networking for Node-1. Only Node-2 will be able to send Offers. network.break_networking_for("Node-1").unwrap(); - let network = network.add_market_instance("Node-3").await; - let market3 = network.get_market("Node-3"); - - // After 300ms we should get at least two broadcasts. - tokio::time::delay_for(Duration::from_millis(320)).await; + let mkt3 = network.get_market("Node-3"); // Make sure Node-3 has all offers from both: Node-1 and Node-2. - for subscription in subscriptions.into_iter() { - market3.get_offer(&subscription).await.unwrap(); - } + assert_offers_broadcasted(&[&mkt1, &mkt2, &mkt3], subscriptions.iter()).await; } /// Nodes send in cyclic broadcasts not only own Offers unsubscribes, but unsubscribes @@ -313,20 +288,19 @@ async fn test_sharing_someones_else_unsubscribes() { .add_market_instance("Node-3") .await; - let market1 = network.get_market("Node-1"); + let mkt1 = network.get_market("Node-1"); let id1 = network.get_default_id("Node-1"); - let market2 = network.get_market("Node-2"); + let mkt2 = network.get_market("Node-2"); let id2 = network.get_default_id("Node-2"); - let market3 = network.get_market("Node-3"); + let mkt3 = network.get_market("Node-3"); let mut subscriptions = vec![]; for _n in 0..3 { subscriptions.push( - market1 - .subscribe_offer(&client::sample_offer(), &id1) + mkt1.subscribe_offer(&client::sample_offer(), &id1) .await .unwrap(), ) @@ -334,8 +308,7 @@ async fn test_sharing_someones_else_unsubscribes() { for _n in 0..2 { subscriptions.push( - market2 - .subscribe_offer(&client::sample_offer(), &id2) + mkt2.subscribe_offer(&client::sample_offer(), &id2) .await .unwrap(), ) @@ -345,37 +318,90 @@ async fn test_sharing_someones_else_unsubscribes() { // After 300ms we should get at least two broadcasts from each Node. tokio::time::delay_for(Duration::from_millis(320)).await; - // Sanity check. Node-3 should have all Offers from market1. - for subscription in subscriptions.iter() { - assert!(market2.get_offer(subscription).await.is_ok()); - } + // Sanity check. Make sure all nodes have all offers. + assert_offers_broadcasted(&[&mkt1, &mkt2, &mkt3], subscriptions.iter()).await; // Break networking for Node-3, so he won't see any unsubscribes. network.break_networking_for("Node-3").unwrap(); for subscription in subscriptions[3..].iter() { - market2.unsubscribe_offer(subscription, &id2).await.unwrap(); + mkt2.unsubscribe_offer(subscription, &id2).await.unwrap(); } - tokio::time::delay_for(Duration::from_millis(50)).await; + // Check if all expected Offers were unsubscribed. + assert_unsunbscribes_broadcasted(&[&mkt1, &mkt2], subscriptions[3..].iter()).await; + // Sanity check. Node-3 should still see all Offers (not unsubscribed). + assert_offers_broadcasted(&[&mkt3], subscriptions.iter()).await; // Disconnect Node-2. Only Node-1 can propagate unsubscribes to Node-3. network.break_networking_for("Node-2").unwrap(); network.enable_networking_for("Node-3").unwrap(); // We expect that all unsubscribed will be shared with Node-3 after this delay. - tokio::time::delay_for(Duration::from_millis(320)).await; - for subscription in subscriptions[3..].into_iter() { - let expected_error = QueryOfferError::Unsubscribed(subscription.clone()); - assert_err_eq!(expected_error, market1.get_offer(&subscription).await); - assert_err_eq!(expected_error, market2.get_offer(&subscription).await); - assert_err_eq!(expected_error, market3.get_offer(&subscription).await); - } + assert_unsunbscribes_broadcasted(&[&mkt1, &mkt2, &mkt3], subscriptions[3..].iter()).await; // All other Offers should remain untouched. - for subscription in subscriptions[0..3].into_iter() { - market1.get_offer(&subscription).await.unwrap(); - market2.get_offer(&subscription).await.unwrap(); - market3.get_offer(&subscription).await.unwrap(); + assert_offers_broadcasted(&[&mkt1, &mkt2, &mkt3], subscriptions[0..3].iter()).await; +} + +/// Assure that all given nodes have the same knowledge about given Subscriptions (Offers). +/// Wait if needed at most 1,5s ( = 10 x 150ms). +async fn assert_offers_broadcasted<'a, S>(mkts: &[&MarketService], subscriptions: S) +where + S: IntoIterator, + ::IntoIter: Clone, +{ + let subscriptions = subscriptions.into_iter(); + let mut all_broadcasted = false; + 'retry: for _i in 0..10 { + for subscription in subscriptions.clone() { + for mkt in mkts { + if mkt.get_offer(&subscription).await.is_err() { + // Every 150ms we should get at least one broadcast from each Node. + // After a few tries all nodes should have the same knowledge about Offers. + tokio::time::delay_for(Duration::from_millis(150)).await; + continue 'retry; + } + } + } + all_broadcasted = true; + break; + } + assert!( + all_broadcasted, + "At least one of the offers was not propagated to all nodes" + ); +} + +/// Assure that all given nodes have the same knowledge about given Subscriptions (Offers). +/// Wait if needed at most 1,5s ( = 10 x 150ms). +async fn assert_unsunbscribes_broadcasted<'a, S>(mkts: &[&MarketService], subscriptions: S) +where + S: IntoIterator, + ::IntoIter: Clone, +{ + let subscriptions = subscriptions.into_iter(); + let mut all_broadcasted = false; + 'retry: for _i in 0..10 { + for subscription in subscriptions.clone() { + for mkt in mkts { + let expect_error = QueryOfferError::Unsubscribed(subscription.clone()).to_string(); + match mkt.get_offer(&subscription).await { + Err(e) => assert_eq!(e.to_string(), expect_error), + Ok(_) => { + // Every 150ms we should get at least one broadcast from each Node. + // After a few tries all nodes should have the same knowledge about Offers. + tokio::time::delay_for(Duration::from_millis(150)).await; + continue 'retry; + } + } + } + } + all_broadcasted = true; + break; } + assert!( + all_broadcasted, + "At least one of the offer unsubscribes was not propagated to all nodes" + ); }