From f27430cde326f31efaf3df67a59c1447e465e3fc Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 2 Sep 2020 17:11:05 -0600 Subject: [PATCH 1/9] Move all test parameters into TestParams Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 177 ++++++++++------------- 1 file changed, 78 insertions(+), 99 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index b582de2cdcc47..b6adf46998cc8 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -35,11 +35,17 @@ use std::time::{Duration, Instant}; use tokio::time::{delay_for, delay_until}; use tower::Service; -#[derive(Copy, Clone, Debug, Default, Serialize)] +#[derive(Copy, Clone, Debug, Derivative, Serialize)] +#[derivative(Default)] struct TestParams { + // The number of requests to issue. + requests: usize, + + // The time interval between requests. + interval: Option, + // The delay is the base time every request takes return. - #[serde(default)] - delay: Duration, + delay: f64, // The jitter is the amount of per-request response time randomness, // as a fraction of `delay`. The average response time will be @@ -61,6 +67,10 @@ struct TestParams { // The number of outstanding requests at which requests will be dropped. #[serde(default)] concurrency_drop: usize, + + #[serde(default)] + #[derivative(Default(value = "InFlightLimit::Auto"))] + in_flight_limit: InFlightLimit, } #[derive(Debug, Default, Serialize)] @@ -163,9 +173,11 @@ impl Service> for TestSink { let in_flight = stats.in_flight.level(); let params = self.params; - let delay = params.delay.mul_f64( - 1.0 + (in_flight - 1) as f64 * params.concurrency_scale - + thread_rng().sample(Exp1) * params.jitter, + let delay = Duration::from_secs_f64( + params.delay + * (1.0 + + (in_flight - 1) as f64 * params.concurrency_scale + + thread_rng().sample(Exp1) * params.jitter), ); let delay = delay_until((now + delay).into()); @@ -224,21 +236,12 @@ struct TestData { cstats: ControllerStatistics, } -async fn run_test(lines: usize, interval: Option, params: TestParams) -> TestData { - run_test4(lines, interval, params, InFlightLimit::Auto).await -} - -async fn run_test4( - lines: usize, - interval: Option, - params: TestParams, - in_flight_limit: InFlightLimit, -) -> TestData { +async fn run_test(params: TestParams) -> TestData { let _ = metrics::init(); let test_config = TestConfig { request: TowerRequestConfig { - in_flight_limit, + in_flight_limit: params.in_flight_limit, rate_limit_num: Some(9999), timeout_secs: Some(1), ..Default::default() @@ -251,7 +254,8 @@ async fn run_test4( let cstats = Arc::clone(&test_config.controller_stats); let mut config = config::Config::builder(); - let generator = GeneratorConfig::repeat(vec!["line 1".into()], lines, interval); + let generator = + GeneratorConfig::repeat(vec!["line 1".into()], params.requests, params.interval); config.add_source("in", generator); config.add_sink("out", &["in"], test_config); @@ -260,10 +264,9 @@ async fn run_test4( let controller = get_controller().unwrap(); // Give time for the generator to start and queue all its data. - let delay = interval.unwrap_or(0.0) * (lines as f64) + 1.0; + let delay = params.interval.unwrap_or(0.0) * (params.requests as f64) + 1.0; delay_for(Duration::from_secs_f64(delay)).await; topology.stop().compat().await.unwrap(); - //shutdown_on_idle(rt); let stats = Arc::try_unwrap(stats) .expect("Failed to unwrap stats Arc") @@ -292,7 +295,7 @@ async fn run_test4( matches!(metrics.get("auto_concurrency_averaged_rtt").unwrap().value, MetricValue::Distribution { .. }) ); - if in_flight_limit == InFlightLimit::Auto { + if params.in_flight_limit == InFlightLimit::Auto { assert!( matches!(metrics.get("auto_concurrency_limit").unwrap().value, MetricValue::Distribution { .. }) @@ -309,16 +312,13 @@ async fn run_test4( #[tokio::test] async fn fixed_concurrency() { // Simulate a very jittery link, but with a fixed concurrency - let results = run_test4( - 200, - None, - TestParams { - delay: Duration::from_millis(100), - jitter: 0.5, - ..Default::default() - }, - InFlightLimit::Fixed(10), - ) + let results = run_test(TestParams { + requests: 200, + delay: 0.100, + jitter: 0.5, + in_flight_limit: InFlightLimit::Fixed(10), + ..Default::default() + }) .await; let in_flight = results.stats.in_flight.stats().unwrap(); @@ -336,14 +336,11 @@ async fn fixed_concurrency() { #[tokio::test] async fn constant_link() { - let results = run_test( - 500, - None, - TestParams { - delay: Duration::from_millis(100), - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 500, + delay: 0.100, + ..Default::default() + }) .await; // With a constant response time link and enough responses, the @@ -388,15 +385,12 @@ async fn constant_link() { #[tokio::test] async fn defers_at_high_concurrency() { - let results = run_test( - 500, - None, - TestParams { - delay: Duration::from_millis(100), - concurrency_defer: 5, - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 500, + delay: 0.100, + concurrency_defer: 5, + ..Default::default() + }) .await; // With a constant time link that gives deferrals over a certain @@ -431,15 +425,12 @@ async fn defers_at_high_concurrency() { #[tokio::test] async fn drops_at_high_concurrency() { - let results = run_test( - 500, - None, - TestParams { - delay: Duration::from_millis(100), - concurrency_drop: 5, - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 500, + delay: 0.100, + concurrency_drop: 5, + ..Default::default() + }) .await; // Since our internal framework doesn't track the "dropped" @@ -470,15 +461,12 @@ async fn drops_at_high_concurrency() { #[tokio::test] async fn slow_link() { - let results = run_test( - 200, - None, - TestParams { - delay: Duration::from_millis(100), - concurrency_scale: 1.0, - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 200, + delay: 0.100, + concurrency_scale: 1.0, + ..Default::default() + }) .await; // With a link that slows down heavily as concurrency increases, the @@ -507,14 +495,12 @@ async fn slow_link() { #[tokio::test] async fn slow_send_1() { - let results = run_test( - 100, - Some(0.100), - TestParams { - delay: Duration::from_millis(50), - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 100, + interval: Some(0.100), + delay: 0.050, + ..Default::default() + }) .await; // With a generator running slower than the link can process, the @@ -541,14 +527,12 @@ async fn slow_send_1() { #[tokio::test] async fn slow_send_2() { - let results = run_test( - 100, - Some(0.050), - TestParams { - delay: Duration::from_millis(50), - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 100, + interval: Some(0.050), + delay: 0.050, + ..Default::default() + }) .await; // With a generator running at the same speed as the link RTT, the @@ -575,14 +559,12 @@ async fn slow_send_2() { #[tokio::test] async fn medium_send() { - let results = run_test( - 500, - Some(0.025), - TestParams { - delay: Duration::from_millis(100), - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 500, + interval: Some(0.025), + delay: 0.100, + ..Default::default() + }) .await; let in_flight = results.stats.in_flight.stats().unwrap(); @@ -608,15 +590,12 @@ async fn medium_send() { #[tokio::test] async fn jittery_link_small() { - let results = run_test( - 500, - None, - TestParams { - delay: Duration::from_millis(100), - jitter: 0.1, - ..Default::default() - }, - ) + let results = run_test(TestParams { + requests: 500, + delay: 0.100, + jitter: 0.1, + ..Default::default() + }) .await; // Jitter can cause concurrency management to vary widely, though it From 6fadd9995c280b0c4fd8709e5b04dd0fce6cceed Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 2 Sep 2020 20:16:39 -0600 Subject: [PATCH 2/9] Move auto concurrency test case data into TOML files Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 455 ++++++------------ tests/data/auto-concurrency-template.toml | 35 ++ .../data/auto-concurrency/constant-link.toml | 28 ++ .../defers-at-high-concurrency.toml | 36 ++ .../drops-at-high-concurrency.toml | 32 ++ .../auto-concurrency/fixed-concurrency.toml | 20 + .../auto-concurrency/jittery-link-small.toml | 25 + tests/data/auto-concurrency/medium-send.toml | 28 ++ tests/data/auto-concurrency/slow-link.toml | 30 ++ tests/data/auto-concurrency/slow-send-1.toml | 29 ++ tests/data/auto-concurrency/slow-send-2.toml | 29 ++ 11 files changed, 444 insertions(+), 303 deletions(-) create mode 100644 tests/data/auto-concurrency-template.toml create mode 100644 tests/data/auto-concurrency/constant-link.toml create mode 100644 tests/data/auto-concurrency/defers-at-high-concurrency.toml create mode 100644 tests/data/auto-concurrency/drops-at-high-concurrency.toml create mode 100644 tests/data/auto-concurrency/fixed-concurrency.toml create mode 100644 tests/data/auto-concurrency/jittery-link-small.toml create mode 100644 tests/data/auto-concurrency/medium-send.toml create mode 100644 tests/data/auto-concurrency/slow-link.toml create mode 100644 tests/data/auto-concurrency/slow-send-1.toml create mode 100644 tests/data/auto-concurrency/slow-send-2.toml diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index b6adf46998cc8..e2c2790ac3d57 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -3,7 +3,6 @@ #![cfg(all(test, not(target_os = "macos"), feature = "sources-generator"))] use super::controller::ControllerStatistics; -use super::MAX_CONCURRENCY; use crate::{ assert_within, config::{self, DataType, SinkConfig, SinkContext}, @@ -17,7 +16,10 @@ use crate::{ Healthcheck, RouterSink, }, sources::generator::GeneratorConfig, - test_util::{start_topology, stats::LevelTimeHistogram}, + test_util::{ + start_topology, + stats::{HistogramStats, LevelTimeHistogram, WeightedSumStats}, + }, }; use core::task::Context; use futures::{ @@ -26,17 +28,21 @@ use futures::{ }; use futures01::{future, Sink}; use rand::{distributions::Exp1, prelude::*}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use snafu::Snafu; -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use std::task::Poll; -use std::time::{Duration, Instant}; +use std::{ + collections::HashMap, + fs::{read_dir, File}, + io::Read, + path::PathBuf, + sync::{Arc, Mutex}, + task::Poll, + time::{Duration, Instant}, +}; use tokio::time::{delay_for, delay_until}; use tower::Service; -#[derive(Copy, Clone, Debug, Derivative, Serialize)] -#[derivative(Default)] +#[derive(Copy, Clone, Debug, Default, Deserialize, Serialize)] struct TestParams { // The number of requests to issue. requests: usize, @@ -68,11 +74,14 @@ struct TestParams { #[serde(default)] concurrency_drop: usize, - #[serde(default)] - #[derivative(Default(value = "InFlightLimit::Auto"))] + #[serde(default = "default_in_flight_limit")] in_flight_limit: InFlightLimit, } +fn default_in_flight_limit() -> InFlightLimit { + InFlightLimit::Auto +} + #[derive(Debug, Default, Serialize)] struct TestConfig { request: TowerRequestConfig, @@ -231,12 +240,12 @@ struct Statistics { } #[derive(Debug)] -struct TestData { +struct TestResults { stats: Statistics, cstats: ControllerStatistics, } -async fn run_test(params: TestParams) -> TestData { +async fn run_test(params: TestParams) -> TestResults { let _ = metrics::init(); let test_config = TestConfig { @@ -306,312 +315,152 @@ async fn run_test(params: TestParams) -> TestData { MetricValue::Distribution { .. }) ); - TestData { stats, cstats } -} - -#[tokio::test] -async fn fixed_concurrency() { - // Simulate a very jittery link, but with a fixed concurrency - let results = run_test(TestParams { - requests: 200, - delay: 0.100, - jitter: 0.5, - in_flight_limit: InFlightLimit::Fixed(10), - ..Default::default() - }) - .await; - - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_eq!(in_flight.max, 10, "{:#?}", results); - assert_eq!(in_flight.mode, 10, "{:#?}", results); - - // Even with jitter, the concurrency limit should never vary - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_eq!(concurrency_limit.min, 10, "{:#?}", results); - assert_eq!(concurrency_limit.max, 10, "{:#?}", results); - let in_flight = results.cstats.in_flight.stats().unwrap(); - assert_eq!(in_flight.max, 10, "{:#?}", results); - assert_eq!(in_flight.mode, 10, "{:#?}", results); + TestResults { stats, cstats } } -#[tokio::test] -async fn constant_link() { - let results = run_test(TestParams { - requests: 500, - delay: 0.100, - ..Default::default() - }) - .await; - - // With a constant response time link and enough responses, the - // limiter will ramp up towards the maximum concurrency. - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_within!(in_flight.max, 10, MAX_CONCURRENCY, "{:#?}", results); - assert_within!( - in_flight.mean, - 6.0, - MAX_CONCURRENCY as f64, - "{:#?}", - results - ); +#[derive(Clone, Copy, Debug, Deserialize)] +struct Range(f64, f64); + +impl Range { + fn assert_usize(&self, value: usize, name1: &str, name2: &str, results: &TestResults) { + assert_within!( + value, + self.0 as usize, + self.1 as usize, + "Value: {} {}\n{:#?}", + name1, + name2, + results + ); + } - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(observed_rtt.max, 0.090, 0.130, "{:#?}", results); - assert_within!(observed_rtt.mean, 0.090, 0.120, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(averaged_rtt.max, 0.090, 0.130, "{:#?}", results); - assert_within!(averaged_rtt.mean, 0.090, 0.120, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_within!(concurrency_limit.max, 9, MAX_CONCURRENCY, "{:#?}", results); - assert_within!( - concurrency_limit.mean, - 5.0, - MAX_CONCURRENCY as f64, - "{:#?}", - results - ); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_within!(c_in_flight.max, 9, MAX_CONCURRENCY, "{:#?}", results); - assert_within!( - c_in_flight.mean, - 6.5, - MAX_CONCURRENCY as f64, - "{:#?}", - results - ); + fn assert_f64(&self, value: f64, name1: &str, name2: &str, results: &TestResults) { + assert_within!( + value, + self.0, + self.1, + "Value: {} {}\n{:#?}", + name1, + name2, + results + ); + } } -#[tokio::test] -async fn defers_at_high_concurrency() { - let results = run_test(TestParams { - requests: 500, - delay: 0.100, - concurrency_defer: 5, - ..Default::default() - }) - .await; - - // With a constant time link that gives deferrals over a certain - // concurrency, the limiter will ramp up to that concurrency and - // then drop down repeatedly. Note that, due to the timing of the - // adjustment, this may actually occasionally go over the error - // limit above, but it will be rare. - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_within!(in_flight.max, 4, 6, "{:#?}", results); - // Since the concurrency will drop down by half each time, the - // average will be below this maximum. - assert_within!(in_flight.mode, 2, 4, "{:#?}", results); - assert_within!(in_flight.mean, 2.0, 4.0, "{:#?}", results); - - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(observed_rtt.max, 0.090, 0.130, "{:#?}", results); - assert_within!(observed_rtt.mean, 0.090, 0.120, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(averaged_rtt.max, 0.090, 0.130, "{:#?}", results); - assert_within!(averaged_rtt.mean, 0.090, 0.120, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_within!(concurrency_limit.max, 5, 6, "{:#?}", results); - assert_within!(concurrency_limit.mode, 2, 5, "{:#?}", results); - assert_within!(concurrency_limit.mean, 2.0, 4.9, "{:#?}", results); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_within!(c_in_flight.max, 5, 6, "{:#?}", results); - assert_within!(c_in_flight.mode, 2, 4, "{:#?}", results); - assert_within!(c_in_flight.mean, 2.0, 4.0, "{:#?}", results); +#[derive(Clone, Copy, Debug, Deserialize)] +struct ResultTest { + min: Option, + max: Option, + mode: Option, + mean: Option, } -#[tokio::test] -async fn drops_at_high_concurrency() { - let results = run_test(TestParams { - requests: 500, - delay: 0.100, - concurrency_drop: 5, - ..Default::default() - }) - .await; - - // Since our internal framework doesn't track the "dropped" - // requests, the values won't be representative of the actual number - // of requests in flight (tracked below in the internal stats). - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_within!(in_flight.max, 4, 5, "{:#?}", results); - assert_within!(in_flight.mode, 3, 4, "{:#?}", results); - assert_within!(in_flight.mean, 1.5, 3.5, "{:#?}", results); - - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.min, 0.090, 0.125, "{:#?}", results); - assert_within!(observed_rtt.max, 0.090, 0.125, "{:#?}", results); - assert_within!(observed_rtt.mean, 0.090, 0.125, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.min, 0.090, 0.125, "{:#?}", results); - assert_within!(averaged_rtt.max, 0.090, 0.125, "{:#?}", results); - assert_within!(averaged_rtt.mean, 0.090, 0.125, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_within!(concurrency_limit.max, 8, 15, "{:#?}", results); - assert_within!(concurrency_limit.mode, 5, 10, "{:#?}", results); - assert_within!(concurrency_limit.mean, 5.0, 10.0, "{:#?}", results); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_within!(c_in_flight.max, 8, 15, "{:#?}", results); - assert_within!(c_in_flight.mode, 5, 10, "{:#?}", results); - assert_within!(c_in_flight.mean, 5.0, 10.0, "{:#?}", results); +impl ResultTest { + fn compare_histogram(&self, stat: HistogramStats, name: &str, results: &TestResults) { + if let Some(range) = self.min { + range.assert_usize(stat.min, name, "min", results); + } + if let Some(range) = self.max { + range.assert_usize(stat.max, name, "max", results); + } + if let Some(range) = self.mean { + range.assert_f64(stat.mean, name, "mean", results); + } + if let Some(range) = self.mode { + range.assert_usize(stat.mode, name, "mode", results); + } + } + + fn compare_weighted_sum(&self, stat: WeightedSumStats, name: &str, results: &TestResults) { + if let Some(range) = self.min { + range.assert_f64(stat.min, name, "min", results); + } + if let Some(range) = self.max { + range.assert_f64(stat.max, name, "max", results); + } + if let Some(range) = self.mean { + range.assert_f64(stat.mean, name, "mean", results); + } + } } -#[tokio::test] -async fn slow_link() { - let results = run_test(TestParams { - requests: 200, - delay: 0.100, - concurrency_scale: 1.0, - ..Default::default() - }) - .await; - - // With a link that slows down heavily as concurrency increases, the - // limiter will keep the concurrency low (timing skews occasionally - // has it reaching 3, but usually just 2), - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_within!(in_flight.max, 1, 3, "{:#?}", results); - // and it will spend most of its time between 1 and 2. - assert_within!(in_flight.mode, 1, 2, "{:#?}", results); - assert_within!(in_flight.mean, 1.0, 2.0, "{:#?}", results); - - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(observed_rtt.mean, 0.090, 0.310, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(averaged_rtt.mean, 0.090, 0.310, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_within!(concurrency_limit.mode, 1, 3, "{:#?}", results); - assert_within!(concurrency_limit.mean, 1.0, 2.0, "{:#?}", results); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_within!(c_in_flight.max, 1, 3, "{:#?}", results); - assert_within!(c_in_flight.mode, 1, 2, "{:#?}", results); - assert_within!(c_in_flight.mean, 1.0, 2.0, "{:#?}", results); +#[derive(Debug, Deserialize)] +struct ControllerResults { + observed_rtt: Option, + averaged_rtt: Option, + concurrency_limit: Option, + in_flight: Option, } -#[tokio::test] -async fn slow_send_1() { - let results = run_test(TestParams { - requests: 100, - interval: Some(0.100), - delay: 0.050, - ..Default::default() - }) - .await; - - // With a generator running slower than the link can process, the - // limiter will never raise the concurrency above 1. - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_eq!(in_flight.max, 1, "{:#?}", results); - assert_eq!(in_flight.mode, 1, "{:#?}", results); - assert_within!(in_flight.mean, 0.5, 1.0, "{:#?}", results); - - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.min, 0.045, 0.060, "{:#?}", results); - assert_within!(observed_rtt.mean, 0.045, 0.060, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.min, 0.045, 0.060, "{:#?}", results); - assert_within!(averaged_rtt.mean, 0.045, 0.060, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_eq!(concurrency_limit.mode, 1, "{:#?}", results); - assert_eq!(concurrency_limit.mean, 1.0, "{:#?}", results); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_eq!(c_in_flight.max, 1, "{:#?}", results); - assert_eq!(c_in_flight.mode, 1, "{:#?}", results); - assert_within!(c_in_flight.mean, 0.5, 1.0, "{:#?}", results); +#[derive(Debug, Deserialize)] +struct StatsResults { + in_flight: Option, } -#[tokio::test] -async fn slow_send_2() { - let results = run_test(TestParams { - requests: 100, - interval: Some(0.050), - delay: 0.050, - ..Default::default() - }) - .await; - - // With a generator running at the same speed as the link RTT, the - // limiter will keep the limit around 1-2 depending on timing jitter. - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_within!(in_flight.max, 1, 3, "{:#?}", results); - assert_within!(in_flight.mode, 1, 2, "{:#?}", results); - assert_within!(in_flight.mean, 0.5, 2.0, "{:#?}", results); - - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.min, 0.045, 0.060, "{:#?}", results); - assert_within!(observed_rtt.mean, 0.045, 0.110, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.min, 0.045, 0.060, "{:#?}", results); - assert_within!(averaged_rtt.mean, 0.045, 0.110, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_within!(concurrency_limit.mode, 1, 2, "{:#?}", results); - assert_within!(concurrency_limit.mean, 1.0, 2.0, "{:#?}", results); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_within!(c_in_flight.max, 1, 3, "{:#?}", results); - assert_within!(c_in_flight.mode, 1, 2, "{:#?}", results); - assert_within!(c_in_flight.mean, 1.0, 2.0, "{:#?}", results); +#[derive(Debug, Deserialize)] +struct TestInput { + params: TestParams, + stats: StatsResults, + controller: ControllerResults, } -#[tokio::test] -async fn medium_send() { - let results = run_test(TestParams { - requests: 500, - interval: Some(0.025), - delay: 0.100, - ..Default::default() - }) - .await; - - let in_flight = results.stats.in_flight.stats().unwrap(); - // With a generator running at four times the speed as the link RTT, - // the limiter will keep around 4-5 requests in flight depending on - // timing jitter. - assert_within!(in_flight.mode, 4, 5, "{:#?}", results); - assert_within!(in_flight.mean, 4.0, 6.0, "{:#?}", results); - - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(observed_rtt.mean, 0.090, 0.120, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.min, 0.090, 0.120, "{:#?}", results); - assert_within!(averaged_rtt.mean, 0.090, 0.500, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_within!(concurrency_limit.max, 4, MAX_CONCURRENCY, "{:#?}", results); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_within!(c_in_flight.max, 4, MAX_CONCURRENCY, "{:#?}", results); - assert_within!(c_in_flight.mode, 4, 5, "{:#?}", results); - assert_within!(c_in_flight.mean, 4.0, 5.0, "{:#?}", results); +async fn run_compare(file_path: PathBuf, input: TestInput) { + eprintln!("Running test in {:?}", file_path); + + let results = run_test(input.params).await; + + if let Some(test) = input.stats.in_flight { + let in_flight = results.stats.in_flight.stats().unwrap(); + test.compare_histogram(in_flight, "stats in_flight", &results); + } + + if let Some(test) = input.controller.in_flight { + let in_flight = results.cstats.in_flight.stats().unwrap(); + test.compare_histogram(in_flight, "controller in_flight", &results); + } + + if let Some(test) = input.controller.concurrency_limit { + let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); + test.compare_histogram(concurrency_limit, "controller concurrency_limit", &results); + } + + if let Some(test) = input.controller.observed_rtt { + let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); + test.compare_weighted_sum(observed_rtt, "controller observed_rtt", &results); + } + + if let Some(test) = input.controller.averaged_rtt { + let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); + test.compare_weighted_sum(averaged_rtt, "controller averaged_rtt", &results); + } } #[tokio::test] -async fn jittery_link_small() { - let results = run_test(TestParams { - requests: 500, - delay: 0.100, - jitter: 0.1, - ..Default::default() - }) - .await; - - // Jitter can cause concurrency management to vary widely, though it - // will typically reach the maximum of 10 in flight. - let in_flight = results.stats.in_flight.stats().unwrap(); - assert_within!(in_flight.max, 15, 30, "{:#?}", results); - assert_within!(in_flight.mean, 4.0, 20.0, "{:#?}", results); - - let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - assert_within!(observed_rtt.mean, 0.090, 0.130, "{:#?}", results); - let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - assert_within!(averaged_rtt.mean, 0.090, 0.130, "{:#?}", results); - let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - assert_within!(concurrency_limit.max, 10, 30, "{:#?}", results); - assert_within!(concurrency_limit.mean, 6.0, 20.0, "{:#?}", results); - let c_in_flight = results.cstats.in_flight.stats().unwrap(); - assert_within!(c_in_flight.max, 15, 30, "{:#?}", results); - assert_within!(c_in_flight.mean, 6.0, 20.0, "{:#?}", results); +async fn all_tests() { + const PATH: &str = "tests/data/auto-concurrency"; + + // Read and parse everything first + let entries = read_dir(PATH) + .expect("Could not open data directory") + .map(|entry| entry.expect("Could not read data directory").path()) + .filter_map(|file_path| { + if (file_path.extension().map(|ext| ext == "toml")).unwrap_or(false) { + let mut data = String::new(); + File::open(&file_path) + .unwrap() + .read_to_string(&mut data) + .unwrap(); + let input: TestInput = toml::from_str(&data) + .unwrap_or_else(|error| panic!("Invalid TOML in {:?}: {:?}", file_path, error)); + Some((file_path, input)) + } else { + None + } + }) + .collect::>(); + + // Then run all the tests + for (file_path, input) in entries { + run_compare(file_path, input).await; + } } diff --git a/tests/data/auto-concurrency-template.toml b/tests/data/auto-concurrency-template.toml new file mode 100644 index 0000000000000..974f87d79d844 --- /dev/null +++ b/tests/data/auto-concurrency-template.toml @@ -0,0 +1,35 @@ +[params] +requests = 000 +delay = 0.050 +# Delete any of these that are not needed +interval = +jitter = 0 +concurrency_scale = 0 +concurrency_defer = 0 +in_flight_limit = "auto" +concurrency_drop = 0 + +[stats.in_flight] +max = [,] +mean = [,] +mode = [,] + +[controller.concurrency_limit] +max = [,] +mean = [,] +mode = [,] + +[controller.in_flight] +max = [,] +mean = [,] +mode = [,] + +[controller.observed_rtt] +min = [,] +max = [,] +mean = [,] + +[controller.averaged_rtt] +min = [,] +max = [,] +mean = [,] diff --git a/tests/data/auto-concurrency/constant-link.toml b/tests/data/auto-concurrency/constant-link.toml new file mode 100644 index 0000000000000..0eecf296c4c17 --- /dev/null +++ b/tests/data/auto-concurrency/constant-link.toml @@ -0,0 +1,28 @@ +# With a constant response time link and enough responses, the limiter +# will ramp up towards the maximum concurrency. + +[params] +requests = 500 +delay = 0.100 + +[stats.in_flight] +max = [10, 200] +mean = [6.0, 200.0] + +[controller.observed_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] +mean = [0.090, 0.125] + +[controller.averaged_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] +mean = [0.090, 0.125] + +[controller.concurrency_limit] +max = [9, 200] +mean = [5.0, 200.0] + +[controller.in_flight] +max = [9, 200] +mean = [6.5, 200.0] diff --git a/tests/data/auto-concurrency/defers-at-high-concurrency.toml b/tests/data/auto-concurrency/defers-at-high-concurrency.toml new file mode 100644 index 0000000000000..3666f10afdb03 --- /dev/null +++ b/tests/data/auto-concurrency/defers-at-high-concurrency.toml @@ -0,0 +1,36 @@ +[params] +requests = 500 +delay = 0.100 +concurrency_defer = 5 + +# With a constant time link that gives deferrals over a certain +# concurrency, the limiter will ramp up to that concurrency and then +# drop down repeatedly. Note that, due to the timing of the adjustment, +# this may actually occasionally go over the error limit above, but it +# will be rare. +[stats.in_flight] +max = [4, 6] +# Since the concurrency will drop down by half each time, the average +# will be below this maximum. +mode = [2, 4] +mean = [2.5, 5.0] + +[controller.in_flight] +max = [5, 6] +mode = [2, 4] +mean = [2.5, 5.0] + +[controller.concurrency_limit] +max = [5, 6] +mode = [2, 5] +mean = [3.0, 6.0] + +[controller.observed_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] +mean = [0.090, 0.125] + +[controller.averaged_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] +mean = [0.090, 0.125] diff --git a/tests/data/auto-concurrency/drops-at-high-concurrency.toml b/tests/data/auto-concurrency/drops-at-high-concurrency.toml new file mode 100644 index 0000000000000..1bfdb89307b4e --- /dev/null +++ b/tests/data/auto-concurrency/drops-at-high-concurrency.toml @@ -0,0 +1,32 @@ +[params] +requests = 500 +delay = 0.100 +concurrency_drop = 5 + +[stats.in_flight] +max = [4, 5] +mean = [2.5, 5.0] +mode = [3, 5] + +# Since our internal framework doesn't track the dropped requests, the +# values won't be representative of the actual number of requests in +# flight. + +[controller.in_flight] +max = [8, 15] +mean = [5.0, 10.0] +mode = [3, 10] + +[controller.concurrency_limit] +mean = [3.5, 8.0] +mode = [4, 10] + +[controller.observed_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] +mean = [0.090, 0.125] + +[controller.averaged_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] +mean = [0.090, 0.125] diff --git a/tests/data/auto-concurrency/fixed-concurrency.toml b/tests/data/auto-concurrency/fixed-concurrency.toml new file mode 100644 index 0000000000000..30c704f5e11d7 --- /dev/null +++ b/tests/data/auto-concurrency/fixed-concurrency.toml @@ -0,0 +1,20 @@ +# Simulate a very jittery link, but with a fixed concurrency. Even with +# jitter, the concurrency limit should never vary. +[params] +requests = 200 +delay = 0.100 +jitter = 0.5 +in_flight_limit = 10 + +[stats.in_flight] +max = [10, 10] +mode = [10, 10] + +[controller.concurrency_limit] +min = [10, 10] +max = [10, 10] +mode = [10, 10] + +[controller.in_flight] +max = [10, 10] +mode = [10, 10] diff --git a/tests/data/auto-concurrency/jittery-link-small.toml b/tests/data/auto-concurrency/jittery-link-small.toml new file mode 100644 index 0000000000000..c1f604e1af100 --- /dev/null +++ b/tests/data/auto-concurrency/jittery-link-small.toml @@ -0,0 +1,25 @@ +[params] +requests = 500 +delay = 0.100 +jitter = 0.1 + +# Jitter can cause concurrency management to vary widely, though it +# will typically reach high values of requests in flight. + +[stats.in_flight] +max = [12, 30] +mean = [6.0, 20.0] + +[controller.in_flight] +max = [12, 30] +mean = [6.0, 20.0] + +[controller.concurrency_limit] +max = [10, 30] +mean = [6.0, 200.0] + +[controller.observed_rtt] +mean = [0.090, 0.130] + +[controller.averaged_rtt] +mean = [0.090, 0.130] diff --git a/tests/data/auto-concurrency/medium-send.toml b/tests/data/auto-concurrency/medium-send.toml new file mode 100644 index 0000000000000..718e6c0b91256 --- /dev/null +++ b/tests/data/auto-concurrency/medium-send.toml @@ -0,0 +1,28 @@ +[params] +requests = 500 +interval = 0.025 +delay = 0.100 + +# With a generator running at four times the speed as the link RTT, +# the limiter will keep around 4-5 requests in flight depending on +# timing jitter. + +[stats.in_flight] +mode = [4, 5] +mean = [4.0, 6.0] + +[controller.in_flight] +max = [4, 20] +mode = [4, 5] +mean = [4.0, 5.0] + +[controller.concurrency_limit] +max = [4, 20] + +[controller.observed_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] + +[controller.averaged_rtt] +min = [0.090, 0.125] +max = [0.090, 0.125] diff --git a/tests/data/auto-concurrency/slow-link.toml b/tests/data/auto-concurrency/slow-link.toml new file mode 100644 index 0000000000000..44044efa57772 --- /dev/null +++ b/tests/data/auto-concurrency/slow-link.toml @@ -0,0 +1,30 @@ +[params] +requests = 200 +delay = 0.100 +concurrency_scale = 1.0 + +# With a link that slows down heavily as concurrency increases, the +# limiter will keep the concurrency low (timing skews occasionally +# has it reaching 3, but usually just 2), +[stats.in_flight] +max = [1, 3] +# and it will spend most of its time between 1 and 2. +mode = [1, 2] +mean = [1.0, 2.0] + +[controller.in_flight] +max = [1, 3] +mode = [1, 2] +mean = [1.0, 2.0] + +[controller.concurrency_limit] +mode = [1, 3] +mean = [1.0, 2.0] + +[controller.observed_rtt] +min = [0.090, 0.125] +mean = [0.090, 0.310] + +[controller.averaged_rtt] +min = [0.090, 0.125] +mean = [0.090, 0.310] diff --git a/tests/data/auto-concurrency/slow-send-1.toml b/tests/data/auto-concurrency/slow-send-1.toml new file mode 100644 index 0000000000000..7758ac6deec4c --- /dev/null +++ b/tests/data/auto-concurrency/slow-send-1.toml @@ -0,0 +1,29 @@ +[params] +requests = 100 +interval = 0.100 +delay = 0.050 + +# With a generator running slower than the link can process, the +# limiter will never raise the concurrency above 1. + +[stats.in_flight] +max = [1, 1] +mode = [1, 1] +mean = [0.5, 1.0] + +[controller.in_flight] +max = [1, 1] +mode = [1, 1] +mean = [0.5, 1.0] + +[controller.concurrency_limit] +mode = [1, 1] +mean = [1.0, 1.0] + +[controller.observed_rtt] +min = [0.045, 0.060] +mean = [0.045, 0.060] + +[controller.averaged_rtt] +min = [0.045, 0.060] +mean = [0.045, 0.060] diff --git a/tests/data/auto-concurrency/slow-send-2.toml b/tests/data/auto-concurrency/slow-send-2.toml new file mode 100644 index 0000000000000..20dc983690903 --- /dev/null +++ b/tests/data/auto-concurrency/slow-send-2.toml @@ -0,0 +1,29 @@ +# With a generator running at the same speed as the link RTT, the +# limiter will keep the limit around 1-2 depending on timing jitter. + +[params] +requests = 100 +interval = 0.050 +delay = 0.050 + +[stats.in_flight] +max = [1, 3] +mode = [1, 2] +mean = [0.5, 2.0] + +[controller.in_flight] +max = [1, 3] +mode = [1, 2] +mean = [1.0, 2.0] + +[controller.concurrency_limit] +mode = [1, 2] +mean = [1.0, 2.0] + +[controller.observed_rtt] +min = [0.045, 0.060] +mean = [0.045, 0.060] + +[controller.averaged_rtt] +min = [0.045, 0.060] +mean = [0.045, 0.060] From 3515dec154750c6ab6e7b79033596da94b5e3334 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Tue, 8 Sep 2020 11:30:23 -0600 Subject: [PATCH 3/9] Run all the tests in parallel Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 88 +++++++++++++++++------- 1 file changed, 64 insertions(+), 24 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index e2c2790ac3d57..4e54a91f5d846 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -34,7 +34,7 @@ use std::{ collections::HashMap, fs::{read_dir, File}, io::Read, - path::PathBuf, + path::{Path, PathBuf}, sync::{Arc, Mutex}, task::Poll, time::{Duration, Instant}, @@ -322,24 +322,33 @@ async fn run_test(params: TestParams) -> TestResults { struct Range(f64, f64); impl Range { - fn assert_usize(&self, value: usize, name1: &str, name2: &str, results: &TestResults) { + fn assert_usize( + &self, + value: usize, + file: &Path, + name1: &str, + name2: &str, + results: &TestResults, + ) { assert_within!( value, self.0 as usize, self.1 as usize, - "Value: {} {}\n{:#?}", + "File: {:?} Value: {} {}\n{:#?}", + file, name1, name2, results ); } - fn assert_f64(&self, value: f64, name1: &str, name2: &str, results: &TestResults) { + fn assert_f64(&self, value: f64, file: &Path, name1: &str, name2: &str, results: &TestResults) { assert_within!( value, self.0, self.1, - "Value: {} {}\n{:#?}", + "File: {:?} Value: {} {}\n{:#?}", + file, name1, name2, results @@ -356,30 +365,42 @@ struct ResultTest { } impl ResultTest { - fn compare_histogram(&self, stat: HistogramStats, name: &str, results: &TestResults) { + fn compare_histogram( + &self, + stat: HistogramStats, + file: &Path, + name: &str, + results: &TestResults, + ) { if let Some(range) = self.min { - range.assert_usize(stat.min, name, "min", results); + range.assert_usize(stat.min, file, name, "min", results); } if let Some(range) = self.max { - range.assert_usize(stat.max, name, "max", results); + range.assert_usize(stat.max, file, name, "max", results); } if let Some(range) = self.mean { - range.assert_f64(stat.mean, name, "mean", results); + range.assert_f64(stat.mean, file, name, "mean", results); } if let Some(range) = self.mode { - range.assert_usize(stat.mode, name, "mode", results); + range.assert_usize(stat.mode, file, name, "mode", results); } } - fn compare_weighted_sum(&self, stat: WeightedSumStats, name: &str, results: &TestResults) { + fn compare_weighted_sum( + &self, + stat: WeightedSumStats, + file: &Path, + name: &str, + results: &TestResults, + ) { if let Some(range) = self.min { - range.assert_f64(stat.min, name, "min", results); + range.assert_f64(stat.min, file, name, "min", results); } if let Some(range) = self.max { - range.assert_f64(stat.max, name, "max", results); + range.assert_f64(stat.max, file, name, "max", results); } if let Some(range) = self.mean { - range.assert_f64(stat.mean, name, "mean", results); + range.assert_f64(stat.mean, file, name, "mean", results); } } } @@ -405,33 +426,50 @@ struct TestInput { } async fn run_compare(file_path: PathBuf, input: TestInput) { - eprintln!("Running test in {:?}", file_path); + eprintln!("Starting test in {:?}", file_path); let results = run_test(input.params).await; + eprintln!("Finished running {:?}", file_path); + if let Some(test) = input.stats.in_flight { let in_flight = results.stats.in_flight.stats().unwrap(); - test.compare_histogram(in_flight, "stats in_flight", &results); + test.compare_histogram(in_flight, &file_path, "stats in_flight", &results); } if let Some(test) = input.controller.in_flight { let in_flight = results.cstats.in_flight.stats().unwrap(); - test.compare_histogram(in_flight, "controller in_flight", &results); + test.compare_histogram(in_flight, &file_path, "controller in_flight", &results); } if let Some(test) = input.controller.concurrency_limit { let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - test.compare_histogram(concurrency_limit, "controller concurrency_limit", &results); + test.compare_histogram( + concurrency_limit, + &file_path, + "controller concurrency_limit", + &results, + ); } if let Some(test) = input.controller.observed_rtt { let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - test.compare_weighted_sum(observed_rtt, "controller observed_rtt", &results); + test.compare_weighted_sum( + observed_rtt, + &file_path, + "controller observed_rtt", + &results, + ); } if let Some(test) = input.controller.averaged_rtt { let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - test.compare_weighted_sum(averaged_rtt, "controller averaged_rtt", &results); + test.compare_weighted_sum( + averaged_rtt, + &file_path, + "controller averaged_rtt", + &results, + ); } } @@ -459,8 +497,10 @@ async fn all_tests() { }) .collect::>(); - // Then run all the tests - for (file_path, input) in entries { - run_compare(file_path, input).await; - } + // Then run all the tests at once + let tests = entries + .into_iter() + .map(|(file_path, input)| run_compare(file_path, input)) + .collect::>(); + futures::future::join_all(tests).await; } From 7d4ebd1a69fd1a5b498cce210397282aeb3e436c Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 9 Sep 2020 11:43:13 -0600 Subject: [PATCH 4/9] Simplify passing around path_name Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 83 ++++++++---------------- 1 file changed, 27 insertions(+), 56 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index 4e54a91f5d846..1d12d1aac127b 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -34,7 +34,7 @@ use std::{ collections::HashMap, fs::{read_dir, File}, io::Read, - path::{Path, PathBuf}, + path::PathBuf, sync::{Arc, Mutex}, task::Poll, time::{Duration, Instant}, @@ -241,11 +241,12 @@ struct Statistics { #[derive(Debug)] struct TestResults { + file_path: PathBuf, stats: Statistics, cstats: ControllerStatistics, } -async fn run_test(params: TestParams) -> TestResults { +async fn run_test(params: TestParams, file_path: PathBuf) -> TestResults { let _ = metrics::init(); let test_config = TestConfig { @@ -315,40 +316,37 @@ async fn run_test(params: TestParams) -> TestResults { MetricValue::Distribution { .. }) ); - TestResults { stats, cstats } + TestResults { + file_path, + stats, + cstats, + } } #[derive(Clone, Copy, Debug, Deserialize)] struct Range(f64, f64); impl Range { - fn assert_usize( - &self, - value: usize, - file: &Path, - name1: &str, - name2: &str, - results: &TestResults, - ) { + fn assert_usize(&self, value: usize, name1: &str, name2: &str, results: &TestResults) { assert_within!( value, self.0 as usize, self.1 as usize, "File: {:?} Value: {} {}\n{:#?}", - file, + results.file_path, name1, name2, results ); } - fn assert_f64(&self, value: f64, file: &Path, name1: &str, name2: &str, results: &TestResults) { + fn assert_f64(&self, value: f64, name1: &str, name2: &str, results: &TestResults) { assert_within!( value, self.0, self.1, "File: {:?} Value: {} {}\n{:#?}", - file, + results.file_path, name1, name2, results @@ -365,42 +363,30 @@ struct ResultTest { } impl ResultTest { - fn compare_histogram( - &self, - stat: HistogramStats, - file: &Path, - name: &str, - results: &TestResults, - ) { + fn compare_histogram(&self, stat: HistogramStats, name: &str, results: &TestResults) { if let Some(range) = self.min { - range.assert_usize(stat.min, file, name, "min", results); + range.assert_usize(stat.min, name, "min", results); } if let Some(range) = self.max { - range.assert_usize(stat.max, file, name, "max", results); + range.assert_usize(stat.max, name, "max", results); } if let Some(range) = self.mean { - range.assert_f64(stat.mean, file, name, "mean", results); + range.assert_f64(stat.mean, name, "mean", results); } if let Some(range) = self.mode { - range.assert_usize(stat.mode, file, name, "mode", results); + range.assert_usize(stat.mode, name, "mode", results); } } - fn compare_weighted_sum( - &self, - stat: WeightedSumStats, - file: &Path, - name: &str, - results: &TestResults, - ) { + fn compare_weighted_sum(&self, stat: WeightedSumStats, name: &str, results: &TestResults) { if let Some(range) = self.min { - range.assert_f64(stat.min, file, name, "min", results); + range.assert_f64(stat.min, name, "min", results); } if let Some(range) = self.max { - range.assert_f64(stat.max, file, name, "max", results); + range.assert_f64(stat.max, name, "max", results); } if let Some(range) = self.mean { - range.assert_f64(stat.mean, file, name, "mean", results); + range.assert_f64(stat.mean, name, "mean", results); } } } @@ -428,48 +414,33 @@ struct TestInput { async fn run_compare(file_path: PathBuf, input: TestInput) { eprintln!("Starting test in {:?}", file_path); - let results = run_test(input.params).await; + let results = run_test(input.params, file_path.clone()).await; eprintln!("Finished running {:?}", file_path); if let Some(test) = input.stats.in_flight { let in_flight = results.stats.in_flight.stats().unwrap(); - test.compare_histogram(in_flight, &file_path, "stats in_flight", &results); + test.compare_histogram(in_flight, "stats in_flight", &results); } if let Some(test) = input.controller.in_flight { let in_flight = results.cstats.in_flight.stats().unwrap(); - test.compare_histogram(in_flight, &file_path, "controller in_flight", &results); + test.compare_histogram(in_flight, "controller in_flight", &results); } if let Some(test) = input.controller.concurrency_limit { let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - test.compare_histogram( - concurrency_limit, - &file_path, - "controller concurrency_limit", - &results, - ); + test.compare_histogram(concurrency_limit, "controller concurrency_limit", &results); } if let Some(test) = input.controller.observed_rtt { let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - test.compare_weighted_sum( - observed_rtt, - &file_path, - "controller observed_rtt", - &results, - ); + test.compare_weighted_sum(observed_rtt, "controller observed_rtt", &results); } if let Some(test) = input.controller.averaged_rtt { let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - test.compare_weighted_sum( - averaged_rtt, - &file_path, - "controller averaged_rtt", - &results, - ); + test.compare_weighted_sum(averaged_rtt, "controller averaged_rtt", &results); } } From 561901dd21041e2524a89eae22e2fc4cdd91c04e Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 11 Sep 2020 09:58:26 -0600 Subject: [PATCH 5/9] Use the tokio manual time tracking to speed up tests By pausing and then manually advancing the clock, this provides for both very fast running tests and nearly perfect time measurements for highly repeatable test results. Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 39 +++++++++++-------- .../data/auto-concurrency/constant-link.toml | 33 ++++++++-------- .../defers-at-high-concurrency.toml | 22 +++++------ .../drops-at-high-concurrency.toml | 23 +++++------ .../auto-concurrency/fixed-concurrency.toml | 8 ++-- .../auto-concurrency/jittery-link-small.toml | 18 ++++----- tests/data/auto-concurrency/medium-send.toml | 17 ++++---- tests/data/auto-concurrency/slow-link.toml | 24 ++++++------ tests/data/auto-concurrency/slow-send-1.toml | 12 +++--- tests/data/auto-concurrency/slow-send-2.toml | 24 ++++++------ 10 files changed, 115 insertions(+), 105 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index 1d12d1aac127b..f2c876b72d023 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -37,9 +37,8 @@ use std::{ path::PathBuf, sync::{Arc, Mutex}, task::Poll, - time::{Duration, Instant}, }; -use tokio::time::{delay_for, delay_until}; +use tokio::time::{self, delay_until, Duration, Instant}; use tower::Service; #[derive(Copy, Clone, Debug, Default, Deserialize, Serialize)] @@ -178,7 +177,7 @@ impl Service> for TestSink { fn call(&mut self, _request: Vec) -> Self::Future { let now = Instant::now(); let mut stats = self.stats.lock().expect("Poisoned stats lock"); - stats.in_flight.adjust(1, now); + stats.in_flight.adjust(1, now.into()); let in_flight = stats.in_flight.level(); let params = self.params; @@ -191,7 +190,7 @@ impl Service> for TestSink { let delay = delay_until((now + delay).into()); if params.concurrency_drop > 0 && in_flight >= params.concurrency_drop { - stats.in_flight.adjust(-1, now); + stats.in_flight.adjust(-1, now.into()); Box::pin(pending()) } else { let stats2 = Arc::clone(&self.stats); @@ -199,7 +198,7 @@ impl Service> for TestSink { delay.await; let mut stats = stats2.lock().expect("Poisoned stats lock"); let in_flight = stats.in_flight.level(); - stats.in_flight.adjust(-1, Instant::now()); + stats.in_flight.adjust(-1, Instant::now().into()); if params.concurrency_defer > 0 && in_flight >= params.concurrency_defer { Err(Error::Deferred) @@ -273,9 +272,15 @@ async fn run_test(params: TestParams, file_path: PathBuf) -> TestResults { let controller = get_controller().unwrap(); - // Give time for the generator to start and queue all its data. + // Give time for the generator to start and queue all its data, and + // all the requests to resolve to a response. let delay = params.interval.unwrap_or(0.0) * (params.requests as f64) + 1.0; - delay_for(Duration::from_secs_f64(delay)).await; + // This is crude and dumb, but it works, and the tests run fast and + // the results are highly repeatable. + let msecs = (delay * 1000.0) as usize; + for _ in 0..msecs { + time::advance(Duration::from_millis(1)).await; + } topology.stop().compat().await.unwrap(); let stats = Arc::try_unwrap(stats) @@ -412,12 +417,10 @@ struct TestInput { } async fn run_compare(file_path: PathBuf, input: TestInput) { - eprintln!("Starting test in {:?}", file_path); + eprintln!("Running test in {:?}", file_path); let results = run_test(input.params, file_path.clone()).await; - eprintln!("Finished running {:?}", file_path); - if let Some(test) = input.stats.in_flight { let in_flight = results.stats.in_flight.stats().unwrap(); test.compare_histogram(in_flight, "stats in_flight", &results); @@ -449,7 +452,7 @@ async fn all_tests() { const PATH: &str = "tests/data/auto-concurrency"; // Read and parse everything first - let entries = read_dir(PATH) + let mut entries = read_dir(PATH) .expect("Could not open data directory") .map(|entry| entry.expect("Could not read data directory").path()) .filter_map(|file_path| { @@ -468,10 +471,12 @@ async fn all_tests() { }) .collect::>(); - // Then run all the tests at once - let tests = entries - .into_iter() - .map(|(file_path, input)| run_compare(file_path, input)) - .collect::>(); - futures::future::join_all(tests).await; + entries.sort_unstable_by_key(|entry| entry.0.to_string_lossy().to_string()); + + time::pause(); + + // Then run all the tests + for (file_path, input) in entries { + run_compare(file_path, input).await; + } } diff --git a/tests/data/auto-concurrency/constant-link.toml b/tests/data/auto-concurrency/constant-link.toml index 0eecf296c4c17..0b3cecf9e6814 100644 --- a/tests/data/auto-concurrency/constant-link.toml +++ b/tests/data/auto-concurrency/constant-link.toml @@ -6,23 +6,24 @@ requests = 500 delay = 0.100 [stats.in_flight] -max = [10, 200] -mean = [6.0, 200.0] +max = [22, 29] +mean = [10.0, 13.0] -[controller.observed_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] -mean = [0.090, 0.125] - -[controller.averaged_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] -mean = [0.090, 0.125] +[controller.in_flight] +max = [22, 29] +mean = [10.0, 13.0] [controller.concurrency_limit] -max = [9, 200] -mean = [5.0, 200.0] +max = [22, 30] +mode = [10, 25] +mean = [10.0, 15.0] -[controller.in_flight] -max = [9, 200] -mean = [6.5, 200.0] +[controller.observed_rtt] +min = [0.100, 0.102] +max = [0.100, 0.102] +mean = [0.100, 0.102] + +[controller.averaged_rtt] +min = [0.100, 0.102] +max = [0.100, 0.102] +mean = [0.100, 0.102] diff --git a/tests/data/auto-concurrency/defers-at-high-concurrency.toml b/tests/data/auto-concurrency/defers-at-high-concurrency.toml index 3666f10afdb03..f35e253371b99 100644 --- a/tests/data/auto-concurrency/defers-at-high-concurrency.toml +++ b/tests/data/auto-concurrency/defers-at-high-concurrency.toml @@ -12,25 +12,25 @@ concurrency_defer = 5 max = [4, 6] # Since the concurrency will drop down by half each time, the average # will be below this maximum. -mode = [2, 4] -mean = [2.5, 5.0] +mode = [4, 4] +mean = [4.0, 5.0] [controller.in_flight] max = [5, 6] -mode = [2, 4] -mean = [2.5, 5.0] +mode = [4, 4] +mean = [4.0, 5.0] [controller.concurrency_limit] max = [5, 6] mode = [2, 5] -mean = [3.0, 6.0] +mean = [4.0, 5.0] [controller.observed_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] -mean = [0.090, 0.125] +min = [0.100, 0.102] +max = [0.100, 0.102] +mean = [0.100, 0.102] [controller.averaged_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] -mean = [0.090, 0.125] +min = [0.100, 0.102] +max = [0.100, 0.102] +mean = [0.100, 0.102] diff --git a/tests/data/auto-concurrency/drops-at-high-concurrency.toml b/tests/data/auto-concurrency/drops-at-high-concurrency.toml index 1bfdb89307b4e..b019f6fd6638e 100644 --- a/tests/data/auto-concurrency/drops-at-high-concurrency.toml +++ b/tests/data/auto-concurrency/drops-at-high-concurrency.toml @@ -5,7 +5,7 @@ concurrency_drop = 5 [stats.in_flight] max = [4, 5] -mean = [2.5, 5.0] +mean = [2.5, 3.0] mode = [3, 5] # Since our internal framework doesn't track the dropped requests, the @@ -13,20 +13,21 @@ mode = [3, 5] # flight. [controller.in_flight] -max = [8, 15] -mean = [5.0, 10.0] +max = [13, 15] +mean = [6.0, 7.0] mode = [3, 10] [controller.concurrency_limit] -mean = [3.5, 8.0] -mode = [4, 10] +max = [10, 15] +mean = [7.0, 8.0] +mode = [4, 6] [controller.observed_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] -mean = [0.090, 0.125] +min = [0.100, 0.102] +max = [0.100, 0.102] +mean = [0.100, 0.102] [controller.averaged_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] -mean = [0.090, 0.125] +min = [0.100, 0.102] +max = [0.100, 0.102] +mean = [0.100, 0.102] diff --git a/tests/data/auto-concurrency/fixed-concurrency.toml b/tests/data/auto-concurrency/fixed-concurrency.toml index 30c704f5e11d7..8b96bf3eb01e6 100644 --- a/tests/data/auto-concurrency/fixed-concurrency.toml +++ b/tests/data/auto-concurrency/fixed-concurrency.toml @@ -9,12 +9,14 @@ in_flight_limit = 10 [stats.in_flight] max = [10, 10] mode = [10, 10] +mean = [8.5, 10.0] -[controller.concurrency_limit] -min = [10, 10] +[controller.in_flight] max = [10, 10] mode = [10, 10] +mean = [8.5, 10.0] -[controller.in_flight] +[controller.concurrency_limit] +min = [10, 10] max = [10, 10] mode = [10, 10] diff --git a/tests/data/auto-concurrency/jittery-link-small.toml b/tests/data/auto-concurrency/jittery-link-small.toml index c1f604e1af100..b8d7c393e865d 100644 --- a/tests/data/auto-concurrency/jittery-link-small.toml +++ b/tests/data/auto-concurrency/jittery-link-small.toml @@ -1,5 +1,5 @@ [params] -requests = 500 +requests = 1000 delay = 0.100 jitter = 0.1 @@ -7,19 +7,19 @@ jitter = 0.1 # will typically reach high values of requests in flight. [stats.in_flight] -max = [12, 30] -mean = [6.0, 20.0] +max = [20, 35] +mean = [10.0, 20.0] [controller.in_flight] -max = [12, 30] -mean = [6.0, 20.0] +max = [20, 35] +mean = [10.0, 20.0] [controller.concurrency_limit] -max = [10, 30] -mean = [6.0, 200.0] +max = [20, 35] +mean = [10.0, 20.0] [controller.observed_rtt] -mean = [0.090, 0.130] +mean = [0.100, 0.130] [controller.averaged_rtt] -mean = [0.090, 0.130] +mean = [0.100, 0.130] diff --git a/tests/data/auto-concurrency/medium-send.toml b/tests/data/auto-concurrency/medium-send.toml index 718e6c0b91256..166c268cbedbf 100644 --- a/tests/data/auto-concurrency/medium-send.toml +++ b/tests/data/auto-concurrency/medium-send.toml @@ -8,21 +8,22 @@ delay = 0.100 # timing jitter. [stats.in_flight] +max = [8, 8] mode = [4, 5] -mean = [4.0, 6.0] +mean = [4.0, 4.5] [controller.in_flight] -max = [4, 20] +max = [8, 8] mode = [4, 5] -mean = [4.0, 5.0] +mean = [4.0, 4.5] [controller.concurrency_limit] -max = [4, 20] +max = [8, 8] [controller.observed_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] +min = [0.100, 0.102] +max = [0.100, 0.102] [controller.averaged_rtt] -min = [0.090, 0.125] -max = [0.090, 0.125] +min = [0.100, 0.102] +max = [0.100, 0.102] diff --git a/tests/data/auto-concurrency/slow-link.toml b/tests/data/auto-concurrency/slow-link.toml index 44044efa57772..a935df0f55524 100644 --- a/tests/data/auto-concurrency/slow-link.toml +++ b/tests/data/auto-concurrency/slow-link.toml @@ -7,24 +7,24 @@ concurrency_scale = 1.0 # limiter will keep the concurrency low (timing skews occasionally # has it reaching 3, but usually just 2), [stats.in_flight] -max = [1, 3] +max = [2, 3] # and it will spend most of its time between 1 and 2. -mode = [1, 2] -mean = [1.0, 2.0] +mode = [2, 2] +mean = [1.5, 2.0] [controller.in_flight] -max = [1, 3] -mode = [1, 2] -mean = [1.0, 2.0] +max = [2, 3] +mode = [2, 2] +mean = [1.5, 2.0] [controller.concurrency_limit] -mode = [1, 3] -mean = [1.0, 2.0] +mode = [2, 3] +mean = [1.7, 2.0] [controller.observed_rtt] -min = [0.090, 0.125] -mean = [0.090, 0.310] +min = [0.100, 0.102] +mean = [0.100, 0.310] [controller.averaged_rtt] -min = [0.090, 0.125] -mean = [0.090, 0.310] +min = [0.100, 0.102] +mean = [0.100, 0.310] diff --git a/tests/data/auto-concurrency/slow-send-1.toml b/tests/data/auto-concurrency/slow-send-1.toml index 7758ac6deec4c..869c9b5a2e206 100644 --- a/tests/data/auto-concurrency/slow-send-1.toml +++ b/tests/data/auto-concurrency/slow-send-1.toml @@ -9,21 +9,21 @@ delay = 0.050 [stats.in_flight] max = [1, 1] mode = [1, 1] -mean = [0.5, 1.0] +mean = [0.5, 0.55] [controller.in_flight] max = [1, 1] mode = [1, 1] -mean = [0.5, 1.0] +mean = [0.5, 0.55] [controller.concurrency_limit] mode = [1, 1] mean = [1.0, 1.0] [controller.observed_rtt] -min = [0.045, 0.060] -mean = [0.045, 0.060] +min = [0.050, 0.052] +mean = [0.050, 0.052] [controller.averaged_rtt] -min = [0.045, 0.060] -mean = [0.045, 0.060] +min = [0.050, 0.052] +mean = [0.050, 0.052] diff --git a/tests/data/auto-concurrency/slow-send-2.toml b/tests/data/auto-concurrency/slow-send-2.toml index 20dc983690903..82b61780c2f1d 100644 --- a/tests/data/auto-concurrency/slow-send-2.toml +++ b/tests/data/auto-concurrency/slow-send-2.toml @@ -1,5 +1,5 @@ # With a generator running at the same speed as the link RTT, the -# limiter will keep the limit around 1-2 depending on timing jitter. +# limiter will keep the limit around 2. [params] requests = 100 @@ -7,23 +7,23 @@ interval = 0.050 delay = 0.050 [stats.in_flight] -max = [1, 3] -mode = [1, 2] -mean = [0.5, 2.0] +max = [1, 2] +mode = [1, 1] +mean = [1.0, 1.2] [controller.in_flight] -max = [1, 3] -mode = [1, 2] +max = [1, 2] +mode = [1, 1] mean = [1.0, 2.0] [controller.concurrency_limit] -mode = [1, 2] -mean = [1.0, 2.0] +mode = [2, 2] +mean = [1.9, 2.0] [controller.observed_rtt] -min = [0.045, 0.060] -mean = [0.045, 0.060] +min = [0.050, 0.052] +mean = [0.050, 0.052] [controller.averaged_rtt] -min = [0.045, 0.060] -mean = [0.045, 0.060] +min = [0.050, 0.052] +mean = [0.050, 0.052] From 5eec82827266682cebb43130d558af1ba71a75ba Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 11 Sep 2020 12:06:08 -0600 Subject: [PATCH 6/9] Fix clippy lint Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index f784de152075c..1af3bce1efd00 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -188,7 +188,6 @@ impl Service> for TestSink { + (in_flight - 1) as f64 * params.concurrency_scale + thread_rng().sample(Exp1) * params.jitter), ); - let delay = delay_until((now + delay).into()); if params.concurrency_drop > 0 && in_flight >= params.concurrency_drop { stats.in_flight.adjust(-1, now.into()); @@ -196,7 +195,7 @@ impl Service> for TestSink { } else { let stats2 = Arc::clone(&self.stats); Box::pin(async move { - delay.await; + delay_until(now + delay).await; let mut stats = stats2.lock().expect("Poisoned stats lock"); let in_flight = stats.in_flight.level(); stats.in_flight.adjust(-1, Instant::now().into()); From e4bce8ea778bad3f86ebb55752c3e09a8571f82b Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 11 Sep 2020 13:49:38 -0600 Subject: [PATCH 7/9] Collect all the failures for each test This has the collateral benefit of being able to drop the large `assert_within` macro that was only used here. Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 159 ++++++++++++++--------- src/test_util/mod.rs | 62 --------- 2 files changed, 99 insertions(+), 122 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index 1af3bce1efd00..eda8619c25064 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -1,10 +1,9 @@ // Only run the test suite on unix systems, as the timings on especially -// MacOS are too variable to produce reliable results in these tests. +// MacOS are too variable to produce reliable in these tests. #![cfg(all(test, not(target_os = "macos"), feature = "sources-generator"))] use super::controller::ControllerStatistics; use crate::{ - assert_within, config::{self, DataType, SinkConfig, SinkContext}, event::{metric::MetricValue, Event}, metrics::{self, capture_metrics, get_controller}, @@ -240,12 +239,11 @@ struct Statistics { #[derive(Debug)] struct TestResults { - file_path: PathBuf, stats: Statistics, cstats: ControllerStatistics, } -async fn run_test(params: TestParams, file_path: PathBuf) -> TestResults { +async fn run_test(params: TestParams) -> TestResults { let _ = metrics::init(); let test_config = TestConfig { @@ -321,41 +319,65 @@ async fn run_test(params: TestParams, file_path: PathBuf) -> TestResults { MetricValue::Distribution { .. }) ); - TestResults { - file_path, - stats, - cstats, - } + TestResults { stats, cstats } +} + +#[derive(Debug)] +enum FailureMode { + ExceededMinimum, + ExceededMaximum, +} + +#[derive(Debug)] +struct Failure { + stat_name: String, + mode: FailureMode, + value: f64, + reference: f64, } #[derive(Clone, Copy, Debug, Deserialize)] struct Range(f64, f64); impl Range { - fn assert_usize(&self, value: usize, name1: &str, name2: &str, results: &TestResults) { - assert_within!( - value, - self.0 as usize, - self.1 as usize, - "File: {:?} Value: {} {}\n{:#?}", - results.file_path, - name1, - name2, - results - ); + fn assert_usize(&self, value: usize, name1: &str, name2: &str) -> Option { + if value < self.0 as usize { + Some(Failure { + stat_name: format!("{} {}", name1, name2), + mode: FailureMode::ExceededMinimum, + value: value as f64, + reference: self.0, + }) + } else if value > self.1 as usize { + Some(Failure { + stat_name: format!("{} {}", name1, name2), + mode: FailureMode::ExceededMaximum, + value: value as f64, + reference: self.1, + }) + } else { + None + } } - fn assert_f64(&self, value: f64, name1: &str, name2: &str, results: &TestResults) { - assert_within!( - value, - self.0, - self.1, - "File: {:?} Value: {} {}\n{:#?}", - results.file_path, - name1, - name2, - results - ); + fn assert_f64(&self, value: f64, name1: &str, name2: &str) -> Option { + if value < self.0 { + Some(Failure { + stat_name: format!("{} {}", name1, name2), + mode: FailureMode::ExceededMinimum, + value: value, + reference: self.0, + }) + } else if value > self.1 { + Some(Failure { + stat_name: format!("{} {}", name1, name2), + mode: FailureMode::ExceededMaximum, + value: value, + reference: self.1, + }) + } else { + None + } } } @@ -368,31 +390,34 @@ struct ResultTest { } impl ResultTest { - fn compare_histogram(&self, stat: HistogramStats, name: &str, results: &TestResults) { - if let Some(range) = self.min { - range.assert_usize(stat.min, name, "min", results); - } - if let Some(range) = self.max { - range.assert_usize(stat.max, name, "max", results); - } - if let Some(range) = self.mean { - range.assert_f64(stat.mean, name, "mean", results); - } - if let Some(range) = self.mode { - range.assert_usize(stat.mode, name, "mode", results); - } + fn compare_histogram(&self, stat: HistogramStats, name: &str) -> Vec { + vec![ + self.min + .and_then(|range| range.assert_usize(stat.min, name, "min")), + self.max + .and_then(|range| range.assert_usize(stat.max, name, "max")), + self.mean + .and_then(|range| range.assert_f64(stat.mean, name, "mean")), + self.mode + .and_then(|range| range.assert_usize(stat.mode, name, "mode")), + ] + .into_iter() + .filter_map(|f| f) + .collect::>() } - fn compare_weighted_sum(&self, stat: WeightedSumStats, name: &str, results: &TestResults) { - if let Some(range) = self.min { - range.assert_f64(stat.min, name, "min", results); - } - if let Some(range) = self.max { - range.assert_f64(stat.max, name, "max", results); - } - if let Some(range) = self.mean { - range.assert_f64(stat.mean, name, "mean", results); - } + fn compare_weighted_sum(&self, stat: WeightedSumStats, name: &str) -> Vec { + vec![ + self.min + .and_then(|range| range.assert_f64(stat.min, name, "min")), + self.max + .and_then(|range| range.assert_f64(stat.max, name, "max")), + self.mean + .and_then(|range| range.assert_f64(stat.mean, name, "mean")), + ] + .into_iter() + .filter_map(|f| f) + .collect::>() } } @@ -419,32 +444,46 @@ struct TestInput { async fn run_compare(file_path: PathBuf, input: TestInput) { eprintln!("Running test in {:?}", file_path); - let results = run_test(input.params, file_path.clone()).await; + let results = run_test(input.params).await; + + let mut failures = Vec::new(); if let Some(test) = input.stats.in_flight { let in_flight = results.stats.in_flight.stats().unwrap(); - test.compare_histogram(in_flight, "stats in_flight", &results); + failures.extend(test.compare_histogram(in_flight, "stats in_flight")); } if let Some(test) = input.controller.in_flight { let in_flight = results.cstats.in_flight.stats().unwrap(); - test.compare_histogram(in_flight, "controller in_flight", &results); + failures.extend(test.compare_histogram(in_flight, "controller in_flight")); } if let Some(test) = input.controller.concurrency_limit { let concurrency_limit = results.cstats.concurrency_limit.stats().unwrap(); - test.compare_histogram(concurrency_limit, "controller concurrency_limit", &results); + failures.extend(test.compare_histogram(concurrency_limit, "controller concurrency_limit")); } if let Some(test) = input.controller.observed_rtt { let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); - test.compare_weighted_sum(observed_rtt, "controller observed_rtt", &results); + failures.extend(test.compare_weighted_sum(observed_rtt, "controller observed_rtt")); } if let Some(test) = input.controller.averaged_rtt { let averaged_rtt = results.cstats.averaged_rtt.stats().unwrap(); - test.compare_weighted_sum(averaged_rtt, "controller averaged_rtt", &results); + failures.extend(test.compare_weighted_sum(averaged_rtt, "controller averaged_rtt")); + } + + for failure in &failures { + let mode = match failure.mode { + FailureMode::ExceededMinimum => "minimum", + FailureMode::ExceededMaximum => "maximum", + }; + eprintln!( + "Comparison failed: {} = {}; {} = {}", + failure.stat_name, failure.value, mode, failure.reference + ); } + assert!(failures.is_empty(), "{:#?}", results); } #[tokio::test] diff --git a/src/test_util/mod.rs b/src/test_util/mod.rs index f337dcde43750..a6d3d34f36dcc 100644 --- a/src/test_util/mod.rs +++ b/src/test_util/mod.rs @@ -50,68 +50,6 @@ macro_rules! assert_downcast_matches { }}; } -#[macro_export] -macro_rules! assert_within { - // Adapted from std::assert_eq - ($expr:expr, $low:expr, $high:expr) => ({ - match (&$expr, &$low, &$high) { - (expr, low, high) => { - if *expr < *low { - panic!( - r#"assertion failed: `(expr < low)` -expr: {} = `{:?}`, - low: `{:?}`"#, - stringify!($expr), - &*expr, - &*low - ); - } - if *expr > *high { - panic!( - r#"assertion failed: `(expr > high)` -expr: {} = `{:?}`, -high: `{:?}`"#, - stringify!($expr), - &*expr, - &*high - ); - } - } - } - }); - ($expr:expr, $low:expr, $high:expr, $($arg:tt)+) => ({ - match (&$expr, &$low, &$high) { - (expr, low, high) => { - if *expr < *low { - panic!( - r#"assertion failed: `(expr < low)` -expr: {} = `{:?}`, - low: `{:?}` -{}"#, - stringify!($expr), - &*expr, - &*low, - format_args!($($arg)+) - ); - } - if *expr > *high { - panic!( - r#"assertion failed: `(expr > high)` -expr: {} = `{:?}`, -high: `{:?}` -{}"#, - stringify!($expr), - &*expr, - &*high, - format_args!($($arg)+) - ); - } - } - } - }); - -} - pub fn next_addr() -> SocketAddr { let port = pick_unused_port().unwrap(); SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port) From a1cce719983d329a644906e4638e01349d15823a Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 11 Sep 2020 13:59:45 -0600 Subject: [PATCH 8/9] The tests should now work predictably on MacOS Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index eda8619c25064..307142953c0e4 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -1,6 +1,4 @@ -// Only run the test suite on unix systems, as the timings on especially -// MacOS are too variable to produce reliable in these tests. -#![cfg(all(test, not(target_os = "macos"), feature = "sources-generator"))] +#![cfg(all(test, feature = "sources-generator"))] use super::controller::ControllerStatistics; use crate::{ From bffecc4b69738d583edd28d176727fc9583ad36d Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 11 Sep 2020 15:10:47 -0600 Subject: [PATCH 9/9] Fix clippy lints Signed-off-by: Bruce Guenter --- src/sinks/util/auto_concurrency/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index 307142953c0e4..053539422cdc4 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -363,14 +363,14 @@ impl Range { Some(Failure { stat_name: format!("{} {}", name1, name2), mode: FailureMode::ExceededMinimum, - value: value, + value, reference: self.0, }) } else if value > self.1 { Some(Failure { stat_name: format!("{} {}", name1, name2), mode: FailureMode::ExceededMaximum, - value: value, + value, reference: self.1, }) } else {