Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking β€œSign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: aggregate request metrics #55

Merged
merged 66 commits into from
Dec 25, 2024
Merged
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
df14bae
mod measure
hayas1 Dec 20, 2024
47becd9
rename module
hayas1 Dec 20, 2024
2357b48
aggregate
hayas1 Dec 20, 2024
889fc3d
bytes
hayas1 Dec 20, 2024
5a13fe7
trait
hayas1 Dec 20, 2024
1d53c1d
response aggregate
hayas1 Dec 20, 2024
77b06ae
count / passed aggregate
hayas1 Dec 20, 2024
0f59020
duration aggregate
hayas1 Dec 21, 2024
1b916ea
response aggregate
hayas1 Dec 21, 2024
dbf7255
ref
hayas1 Dec 21, 2024
f0ed6de
pass aggregate
hayas1 Dec 21, 2024
47ac1d3
evaluate aggregate
hayas1 Dec 21, 2024
c4202df
todo
hayas1 Dec 21, 2024
b2e66a7
type name
hayas1 Dec 21, 2024
8eb9baa
modify test
hayas1 Dec 21, 2024
5b26029
into iter
hayas1 Dec 21, 2024
7e5de42
metrics
hayas1 Dec 21, 2024
195ddf6
metrics
hayas1 Dec 21, 2024
8858575
WIP: case
hayas1 Dec 21, 2024
3276117
use hdrhistogram crate
hayas1 Dec 21, 2024
157f6a8
implement `merge`
hayas1 Dec 21, 2024
947a7f1
aggregate in `CaseReport`
hayas1 Dec 21, 2024
229be23
impl `Default`
hayas1 Dec 21, 2024
285fad8
implement recursive aggregate
hayas1 Dec 21, 2024
507edc1
pri
hayas1 Dec 21, 2024
248960a
modify testcase
hayas1 Dec 21, 2024
c5afe6c
update testcase comment
hayas1 Dec 21, 2024
979fbcc
report metrics (now, error)
hayas1 Dec 21, 2024
f592912
TODO
hayas1 Dec 21, 2024
78d422a
give priority to add / other
hayas1 Dec 21, 2024
0471649
unit
hayas1 Dec 21, 2024
3e02d43
service example
hayas1 Dec 21, 2024
8166842
remove TODO
hayas1 Dec 22, 2024
4ebc68d
`console_aggregate` default implementation
hayas1 Dec 22, 2024
033ca82
rename trait and implementation
hayas1 Dec 22, 2024
00b86a5
Aggregate struct
hayas1 Dec 22, 2024
4958c82
colorize console report
hayas1 Dec 22, 2024
b953079
todo
hayas1 Dec 22, 2024
6b026f0
writeln
hayas1 Dec 22, 2024
1b3be59
update threshold
hayas1 Dec 22, 2024
f95c049
summary of all requests
hayas1 Dec 22, 2024
b26112e
measure or not
hayas1 Dec 23, 2024
6be4b54
add evaluate aggregate test
hayas1 Dec 23, 2024
6a7d01e
sigfig < 5 is allowed
hayas1 Dec 23, 2024
cb80fd7
quantile from cmd
hayas1 Dec 23, 2024
dc8240a
percentile as f64 (for p99.9)
hayas1 Dec 23, 2024
abf086b
RunCommandError::NanPercentile
hayas1 Dec 23, 2024
e31c580
rename
hayas1 Dec 23, 2024
c59a124
move to top of the file
hayas1 Dec 23, 2024
ee7a83d
resolve todo
hayas1 Dec 23, 2024
6b03a75
add measure test
hayas1 Dec 23, 2024
ffdcbef
micros
hayas1 Dec 23, 2024
99aeb7b
implement github markdown measure report
hayas1 Dec 23, 2024
497edf5
implement github markdown measure report test
hayas1 Dec 23, 2024
344be9f
cargo fmt
hayas1 Dec 23, 2024
c9927d4
Classify / Classified
hayas1 Dec 23, 2024
fa30e30
classification
hayas1 Dec 24, 2024
e6359d0
use Classification
hayas1 Dec 24, 2024
41efea5
pass_rate
hayas1 Dec 24, 2024
4c948ed
arg: Query
hayas1 Dec 24, 2024
e229889
MdStyle
hayas1 Dec 24, 2024
f7bb655
remove MdStyle
hayas1 Dec 24, 2024
d411914
writeln!
hayas1 Dec 24, 2024
9b76195
instant
hayas1 Dec 25, 2024
9117f55
remove Option / Result
hayas1 Dec 25, 2024
88aa3e7
modify test
hayas1 Dec 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
arg: Query
  • Loading branch information
hayas1 committed Dec 24, 2024
commit 4c948ed491be809c2824ae30eb8551288c8ed325
97 changes: 51 additions & 46 deletions src/assault/measure/aggregate.rs
Original file line number Diff line number Diff line change
@@ -8,10 +8,11 @@ use super::metrics::Metrics;

pub trait Aggregate: Default {
type Add;
type Query;
type Aggregate;
fn add(&mut self, add: &Self::Add);
fn merge(&mut self, other: &Self);
fn aggregate(&self) -> Self::Aggregate;
fn aggregate(&self, query: &Self::Query) -> Self::Aggregate;
}

#[derive(Debug, Clone, Default)]
@@ -26,6 +27,7 @@ pub struct EvaluateAggregate {
}
impl Aggregate for EvaluateAggregator {
type Add = (bool, Destinations<Option<<ResponseAggregator as Aggregate>::Add>>);
type Query = Vec<f64>; // TODO [f64] ?
type Aggregate = EvaluateAggregate;

fn add(&mut self, (pass, dst): &Self::Add) {
@@ -44,25 +46,24 @@ impl Aggregate for EvaluateAggregator {
self.destinations.entry(d.to_string()).or_default().merge(r);
})
}
fn aggregate(&self) -> Self::Aggregate {
EvaluateAggregate { pass: self.pass.aggregate(), response: self.aggregate_responses() }
fn aggregate(&self, query: &Self::Query) -> Self::Aggregate {
EvaluateAggregate { pass: self.pass.aggregate(&()), response: self.aggregate_responses(query) }
}
}
impl EvaluateAggregator {
pub fn new<T, I: IntoIterator<Item = f64>>(dst: &Destinations<T>, now: Option<SystemTime>, quantile: I) -> Self {
let percentile: Vec<_> = quantile.into_iter().collect();
let destinations = dst.keys().map(|d| (d, ResponseAggregator::new(now, percentile.iter().copied()))).collect();
pub fn new<T>(dst: &Destinations<T>, now: Option<SystemTime>) -> Self {
let destinations = dst.keys().map(|d| (d, ResponseAggregator::new(now))).collect();
Self { pass: PassAggregator::new(), destinations }
}

pub fn aggregate_responses(&self) -> ResponseAggregate {
pub fn aggregate_responses(&self, query: &[f64]) -> ResponseAggregate {
self.destinations
.values()
.fold(ResponseAggregator::default(), |mut agg, r| {
agg.merge(r);
agg
})
.aggregate()
.aggregate(&query.to_vec())
}
}

@@ -83,6 +84,7 @@ pub struct ResponseAggregate {
}
impl Aggregate for ResponseAggregator {
type Add = Metrics;
type Query = Vec<f64>; // TODO [f64] ?
type Aggregate = ResponseAggregate;
fn add(&mut self, res: &Self::Add) {
self.count.add(&());
@@ -96,28 +98,28 @@ impl Aggregate for ResponseAggregator {
self.bytes.merge(&other.bytes);
self.latency.merge(&other.latency);
}
fn aggregate(&self) -> Self::Aggregate {
fn aggregate(&self, query: &Self::Query) -> Self::Aggregate {
ResponseAggregate {
req: self.count.aggregate(),
duration: self.duration.aggregate().ok(),
req: self.count.aggregate(&()),
duration: self.duration.aggregate(&()).ok(),
rps: self.rps().ok(),
bytes: self.bytes.aggregate(),
latency: self.latency.aggregate(),
bytes: self.bytes.aggregate(&()),
latency: self.latency.aggregate(query),
}
}
}
impl ResponseAggregator {
pub fn new<I: IntoIterator<Item = f64>>(now: Option<SystemTime>, quantile: I) -> Self {
pub fn new(now: Option<SystemTime>) -> Self {
Self {
count: CountAggregator::new(),
duration: DurationAggregator::new(now),
bytes: BytesAggregator {},
latency: LatencyAggregator::new(quantile),
latency: LatencyAggregator::new(),
}
}

pub fn rps(&self) -> Result<f64, SystemTimeError> {
Ok(self.count.aggregate() as f64 / self.duration.aggregate()?.as_secs_f64())
Ok(self.count.aggregate(&()) as f64 / self.duration.aggregate(&())?.as_secs_f64())
}
}

@@ -127,14 +129,15 @@ pub struct CountAggregator {
}
impl Aggregate for CountAggregator {
type Add = ();
type Query = ();
type Aggregate = u64;
fn add(&mut self, (): &Self::Add) {
self.count += 1;
}
fn merge(&mut self, other: &Self) {
self.count += other.count;
}
fn aggregate(&self) -> Self::Aggregate {
fn aggregate(&self, (): &Self::Query) -> Self::Aggregate {
self.count
}
}
@@ -157,6 +160,7 @@ pub struct PassAggregate {
}
impl Aggregate for PassAggregator {
type Add = bool;
type Query = ();
type Aggregate = PassAggregate;
fn add(&mut self, pass: &Self::Add) {
if *pass {
@@ -168,7 +172,7 @@ impl Aggregate for PassAggregator {
self.pass.merge(&other.pass);
self.count.merge(&other.count);
}
fn aggregate(&self) -> Self::Aggregate {
fn aggregate(&self, (): &Self::Query) -> Self::Aggregate {
PassAggregate { pass: self.pass(), count: self.count(), pass_rate: self.pass_rate() }
}
}
@@ -178,10 +182,10 @@ impl PassAggregator {
}

pub fn count(&self) -> u64 {
self.count.aggregate()
self.count.aggregate(&())
}
pub fn pass(&self) -> u64 {
self.pass.aggregate()
self.pass.aggregate(&())
}
pub fn pass_rate(&self) -> f64 {
self.pass() as f64 / self.count() as f64
@@ -194,6 +198,7 @@ pub struct DurationAggregator {
}
impl Aggregate for DurationAggregator {
type Add = SystemTime;
type Query = ();
type Aggregate = Result<Duration, SystemTimeError>;
fn add(&mut self, timestamp: &Self::Add) {
match self.start_end {
@@ -210,7 +215,7 @@ impl Aggregate for DurationAggregator {
}
}
}
fn aggregate(&self) -> Self::Aggregate {
fn aggregate(&self, (): &Self::Query) -> Self::Aggregate {
Ok(match &self.start_end {
None => Duration::from_secs(0),
Some((start, end)) => end.duration_since(*start)?,
@@ -231,17 +236,17 @@ pub struct BytesAggregator {
pub struct BytesAggregate {}
impl Aggregate for BytesAggregator {
type Add = ();
type Query = ();
type Aggregate = BytesAggregate;
fn add(&mut self, _: &Self::Add) {}
fn merge(&mut self, _: &Self) {}
fn aggregate(&self) -> Self::Aggregate {
fn aggregate(&self, (): &Self::Query) -> Self::Aggregate {
Default::default()
}
}

#[derive(Debug, Clone)]
pub struct LatencyAggregator {
quantile: Vec<f64>,
hist: Histogram<u64>,
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Default)]
@@ -253,30 +258,29 @@ pub struct LatencyAggregate {
}
impl Aggregate for LatencyAggregator {
type Add = Duration;
type Query = Vec<f64>; // TODO [f64] ?
type Aggregate = LatencyAggregate;
fn add(&mut self, latency: &Self::Add) {
self.hist += latency.as_micros() as u64;
}
fn merge(&mut self, other: &Self) {
// TODO Default quantile will be empty, so give priority to other
// TODO `Aggregate` trait should do not have `aggregate` method, but have `sub_aggregator` method ?
self.quantile = other.quantile.clone();
self.hist += &other.hist;
}
fn aggregate(&self) -> Self::Aggregate {
LatencyAggregate { min: self.min(), mean: self.mean(), quantile: self.quantile(), max: self.max() }
fn aggregate(&self, query: &Self::Query) -> Self::Aggregate {
LatencyAggregate { min: self.min(), mean: self.mean(), quantile: self.quantile(query), max: self.max() }
}
}
impl Default for LatencyAggregator {
fn default() -> Self {
Self::new([])
let hist = Histogram::new(3).unwrap_or_else(|e| unreachable!("{}", e));
Self { hist }
}
}
impl LatencyAggregator {
pub fn new<I: IntoIterator<Item = f64>>(quantile: I) -> Self {
let quantile = quantile.into_iter().collect();
let hist = Histogram::new(3).unwrap_or_else(|e| unreachable!("{}", e));
Self { quantile, hist }
pub fn new() -> Self {
Default::default()
}

pub fn min(&self) -> Duration {
@@ -285,8 +289,8 @@ impl LatencyAggregator {
pub fn mean(&self) -> Duration {
Duration::from_micros(self.hist.mean() as u64)
}
pub fn quantile(&self) -> Vec<Duration> {
self.quantile.iter().map(|q| self.value_at_quantile(*q)).collect()
pub fn quantile(&self, quantile: &[f64]) -> Vec<Duration> {
quantile.iter().map(|&q| self.value_at_quantile(q)).collect()
}
pub fn value_at_quantile(&self, quantile: f64) -> Duration {
Duration::from_micros(self.hist.value_at_quantile(quantile))
@@ -308,7 +312,7 @@ mod tests {
for _ in 0..1000 {
agg.add(&());
}
assert_eq!(agg.aggregate(), 1000);
assert_eq!(agg.aggregate(&()), 1000);
}

#[test]
@@ -317,7 +321,7 @@ mod tests {
for i in 0..1000 {
agg.add(&(i % 2 == 0));
}
assert_eq!(agg.aggregate(), PassAggregate { pass: 500, count: 1000, pass_rate: 0.5 });
assert_eq!(agg.aggregate(&()), PassAggregate { pass: 500, count: 1000, pass_rate: 0.5 });
}

#[test]
@@ -326,18 +330,18 @@ mod tests {
for i in 0..1000 {
agg.add(&(SystemTime::UNIX_EPOCH + Duration::from_millis(i)));
}
assert_eq!(agg.aggregate().unwrap(), Duration::from_millis(999));
assert_eq!(agg.aggregate(&()).unwrap(), Duration::from_millis(999));
}

#[test]
fn latency_aggregate() {
let mut agg = LatencyAggregator::new([0.5, 0.9, 0.99]);
let mut agg = LatencyAggregator::new();
for i in 1..1000 {
agg.add(&Duration::from_millis(i));
}

let tolerance = Duration::from_millis(1);
let LatencyAggregate { min, mean, quantile, max } = agg.aggregate();
let LatencyAggregate { min, mean, quantile, max } = agg.aggregate(&vec![0.5, 0.9, 0.99]);

assert!(min.abs_diff(Duration::from_millis(1)) < tolerance);
assert!(mean.abs_diff(Duration::from_millis(500)) < tolerance);
@@ -353,18 +357,20 @@ mod tests {

#[test]
fn merge_latency_aggregate() {
let mut agg1 = LatencyAggregator::new([0.5, 0.9, 0.99]);
let mut agg1 = LatencyAggregator::new();
for i in 1..500 {
agg1.add(&Duration::from_millis(i));
}
let mut agg2 = LatencyAggregator::new([0.5, 0.9, 0.99]);
let mut agg2 = LatencyAggregator::new();
for i in 500..1000 {
agg2.add(&Duration::from_millis(i));
}

let tolerance = Duration::from_millis(1);
let LatencyAggregate { min: min1, mean: mean1, quantile: quantile1, max: max1 } = agg1.aggregate();
let LatencyAggregate { min: min2, mean: mean2, quantile: quantile2, max: max2 } = agg2.aggregate();
let LatencyAggregate { min: min1, mean: mean1, quantile: quantile1, max: max1 } =
agg1.aggregate(&vec![0.5, 0.9, 0.99]);
let LatencyAggregate { min: min2, mean: mean2, quantile: quantile2, max: max2 } =
agg2.aggregate(&vec![0.5, 0.9, 0.99]);

assert!(min1.abs_diff(Duration::from_millis(1)) < tolerance);
assert!(mean1.abs_diff(Duration::from_millis(250)) < tolerance);
@@ -389,7 +395,7 @@ mod tests {
assert!(max2.abs_diff(Duration::from_millis(1000)) < tolerance);

agg1.merge(&agg2);
let LatencyAggregate { min, mean, quantile, max } = agg1.aggregate();
let LatencyAggregate { min, mean, quantile, max } = agg1.aggregate(&vec![0.5, 0.9, 0.99]);
assert!(min.abs_diff(Duration::from_millis(1)) < tolerance);
assert!(mean.abs_diff(Duration::from_millis(500)) < tolerance);
for (q, p) in quantile.iter().zip(vec![
@@ -404,8 +410,7 @@ mod tests {

#[test]
fn evaluate_aggregate() {
let mut agg =
EvaluateAggregator::new(&Destinations::<()>::new(), Some(SystemTime::UNIX_EPOCH), [0.5, 0.9, 0.99]);
let mut agg = EvaluateAggregator::new(&Destinations::<()>::new(), Some(SystemTime::UNIX_EPOCH));
for i in 0..1000 {
let d = vec![
(
@@ -439,7 +444,7 @@ mod tests {
}

let tolerance = Duration::from_millis(1);
let EvaluateAggregate { pass, response } = agg.aggregate();
let EvaluateAggregate { pass, response } = agg.aggregate(&vec![0.5, 0.9, 0.99]);
assert_eq!(pass, PassAggregate { pass: 500, count: 1000, pass_rate: 0.5 });
let ResponseAggregate { req, duration, rps, bytes, latency } = response;
assert_eq!(req, 2000);
2 changes: 1 addition & 1 deletion src/assault/worker.rs
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ where
testcase: Coalesced<Testcase<Q, P>, Setting<Q, P>>,
) -> WrappedResult<CaseReport<P::Message, Q, P>> {
let case = &testcase.coalesce();
let evaluate_aggregate = EvaluateAggregator::new(destinations, None, cmd.quantile_set());
let evaluate_aggregate = EvaluateAggregator::new(destinations, None);

let (passed, messages, aggregate) = self
.requests(cmd, destinations, case)
2 changes: 1 addition & 1 deletion src/interface/report/console.rs
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ pub trait ConsoleReport: Reportable {
w: &mut ReportWriter<W>,
e: F, // TODO where Self::Error: From<std::io::Error> ?
) -> Result<(), Self::Error> {
let EvaluateAggregate { pass: pass_agg, response } = self.aggregator().aggregate();
let EvaluateAggregate { pass: pass_agg, response } = self.aggregator().aggregate(&cmd.quantile_set());
let PassAggregate { pass, count, pass_rate } = &pass_agg;
let ResponseAggregate { req, duration, rps, latency, .. } = &response;
let LatencyAggregate { min, mean, quantile, max } = &latency;
4 changes: 2 additions & 2 deletions src/interface/report/github_markdown.rs
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ pub trait GithubMarkdownReport: Reportable {
w: &mut ReportWriter<W>,
e: F, // TODO where Self::Error: From<std::io::Error> ?
) -> Result<(), Self::Error> {
let EvaluateAggregate { pass: pass_agg, response } = self.aggregator().aggregate();
let EvaluateAggregate { pass: pass_agg, response } = self.aggregator().aggregate(&cmd.quantile_set());
let PassAggregate { pass, count, pass_rate } = &pass_agg;
let ResponseAggregate { req, duration, rps, latency, .. } = &response;
let LatencyAggregate { min, mean, quantile, max } = &latency;
@@ -39,7 +39,7 @@ pub trait GithubMarkdownReport: Reportable {
writeln!(w, " max |").map_err(e.clone())?;

write!(w, "| --- | --- | --- |").map_err(e.clone())?;
for _ in quantile {
for _ in cmd.percentile_set() {
write!(w, " --- |").map_err(e.clone())?;
}
writeln!(w, " --- |").map_err(e.clone())?;