Skip to content

Commit

Permalink
Add shotover metrics profiler (shotover#1378)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Dec 4, 2023
1 parent c42cb9c commit 4f89245
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 10 deletions.
1 change: 1 addition & 0 deletions .github/workflows/windsock_benches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph --name cassandra,compression=none,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers samply --name cassandra,compression=none,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor --name kafka,shotover=standard,size=1B,topology=single
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics --name redis,encryption=none,operation=get,shotover=standard,topology=single
# windsock/examples/cassandra.rs - this can stay here until windsock is moved to its own repo
cargo run --release --example cassandra -- --bench-length-seconds 5 --operations-per-second 100
Expand Down
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ license = "Apache-2.0"
shotover = { path = "../shotover" }

[dev-dependencies]
prometheus-parse = "0.2.4"
reqwest.workspace = true
scylla.workspace = true
anyhow.workspace = true
tokio.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion shotover-proxy/benches/windsock/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ impl Bench for CassandraBench {
}
}
let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await;
CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &shotover_ip)
.await;

let cassandra_nodes = vec![
AwsNodeInfo {
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,12 @@ impl Bench for KafkaBench {
}
}

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await;

let kafka_ip = kafka_instance1.instance.private_ip().to_string();
let shotover_ip = shotover_instance.instance.private_ip().to_string();

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &kafka_ip).await;

let kafka_instances = vec![
kafka_instance1.clone(),
kafka_instance2.clone(),
Expand Down
39 changes: 38 additions & 1 deletion shotover-proxy/benches/windsock/profilers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use self::samply::Samply;
use self::{samply::Samply, shotover_metrics::ShotoverMetrics};
use crate::common::Shotover;
use anyhow::Result;
use aws_throwaway::Ec2Instance;
Expand All @@ -11,14 +11,17 @@ use windsock::Profiling;
mod perf_flamegraph;
mod samply;
mod sar;
mod shotover_metrics;

pub struct ProfilerRunner {
bench_name: String,
run_flamegraph: bool,
run_samply: bool,
run_shotover_metrics: bool,
run_sys_monitor: bool,
results_path: PathBuf,
perf: Option<Perf>,
shotover_metrics: Option<ShotoverMetrics>,
samply: Option<Samply>,
sys_monitor: Option<UnboundedReceiver<Result<String>>>,
}
Expand All @@ -32,14 +35,19 @@ impl ProfilerRunner {
let run_sys_monitor = profiling
.profilers_to_use
.contains(&"sys_monitor".to_owned());
let run_shotover_metrics = profiling
.profilers_to_use
.contains(&"shotover_metrics".to_owned());

ProfilerRunner {
bench_name,
run_flamegraph,
run_sys_monitor,
run_samply,
run_shotover_metrics,
results_path: profiling.results_path,
perf: None,
shotover_metrics: None,
samply: None,
sys_monitor: None,
}
Expand All @@ -58,6 +66,15 @@ impl ProfilerRunner {
} else {
None
};
self.shotover_metrics = if self.run_shotover_metrics {
if shotover.is_some() {
Some(ShotoverMetrics::new(self.bench_name.clone(), "localhost"))
} else {
panic!("shotover_metrics not supported when benching without shotover")
}
} else {
None
};
self.samply = if self.run_samply {
if let Some(shotover) = &shotover {
Some(Samply::run(self.results_path.clone(), shotover.child().id().unwrap()).await)
Expand Down Expand Up @@ -91,6 +108,9 @@ impl Drop for ProfilerRunner {
if let Some(samply) = self.samply.take() {
samply.wait();
}
if let Some(shotover_metrics) = self.shotover_metrics.take() {
shotover_metrics.insert_results_to_bench_archive();
}
if let Some(mut rx) = self.sys_monitor.take() {
sar::insert_sar_results_to_bench_archive(&self.bench_name, "", sar::parse_sar(&mut rx));
}
Expand All @@ -100,28 +120,41 @@ impl Drop for ProfilerRunner {
pub struct CloudProfilerRunner {
bench_name: String,
monitor_instances: HashMap<String, UnboundedReceiver<Result<String>>>,
shotover_metrics: Option<ShotoverMetrics>,
}

impl CloudProfilerRunner {
pub async fn new(
bench_name: String,
profiling: Profiling,
instances: HashMap<String, &Ec2Instance>,
shotover_ip: &str,
) -> Self {
let run_sys_monitor = profiling
.profilers_to_use
.contains(&"sys_monitor".to_owned());

let run_shotover_metrics = profiling
.profilers_to_use
.contains(&"shotover_metrics".to_owned());

let mut monitor_instances = HashMap::new();
if run_sys_monitor {
for (name, instance) in instances {
monitor_instances.insert(name, sar::run_sar_remote(instance).await);
}
}

let shotover_metrics = if run_shotover_metrics {
Some(ShotoverMetrics::new(bench_name.clone(), shotover_ip))
} else {
None
};

CloudProfilerRunner {
bench_name,
monitor_instances,
shotover_metrics,
}
}

Expand All @@ -133,6 +166,9 @@ impl CloudProfilerRunner {
sar::parse_sar(instance_rx),
);
}
if let Some(shotover_metrics) = self.shotover_metrics.take() {
shotover_metrics.insert_results_to_bench_archive();
}
}
}

Expand All @@ -144,6 +180,7 @@ pub fn supported_profilers(shotover: Shotover) -> Vec<String> {
"flamegraph".to_owned(),
"samply".to_owned(),
"sys_monitor".to_owned(),
"shotover_metrics".to_owned(),
]
}
}
170 changes: 170 additions & 0 deletions shotover-proxy/benches/windsock/profilers/shotover_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use prometheus_parse::Value;
use std::{collections::HashMap, time::Duration};
use time::OffsetDateTime;
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, time::MissedTickBehavior};
use windsock::{Goal, LatencyPercentile, Metric, ReportArchive};

pub struct ShotoverMetrics {
shutdown_tx: mpsc::Sender<()>,
task: JoinHandle<()>,
}

pub struct RawPrometheusExposition {
timestamp: OffsetDateTime,
content: String,
}
type ParsedMetrics = HashMap<String, Vec<Value>>;

impl ShotoverMetrics {
pub fn new(bench_name: String, shotover_ip: &str) -> Self {
let url = format!("http://{shotover_ip}:9001/metrics");
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let task = tokio::spawn(async move {
// collect metrics until shutdown
let raw_metrics = Self::collect_metrics(shutdown_rx, &url).await;

// we are now shutting down and need to process and save all collected metrics
let mut report = ReportArchive::load(&bench_name).unwrap();
let parsed = Self::parse_metrics(raw_metrics, &report);
report.metrics.extend(Self::windsock_metrics(parsed));
report.save();
});
ShotoverMetrics { task, shutdown_tx }
}

pub fn parse_metrics(
raw_metrics: Vec<RawPrometheusExposition>,
report: &ReportArchive,
) -> ParsedMetrics {
let mut result: ParsedMetrics = HashMap::new();
for raw_metric in raw_metrics {
if raw_metric.timestamp > report.bench_started_at {
let metrics = prometheus_parse::Scrape::parse(
raw_metric.content.lines().map(|x| Ok(x.to_owned())),
)
.unwrap();
for sample in metrics.samples {
let key = format!(
"{}{{{}}}",
sample
.metric
.strip_prefix("shotover_")
.unwrap_or(&sample.metric),
sample.labels
);

result.entry(key).or_default().push(sample.value);
}
}
}
result
}

pub fn windsock_metrics(parsed_metrics: ParsedMetrics) -> Vec<Metric> {
let mut result = vec![];
for (name, value) in parsed_metrics {
match value[0] {
Value::Gauge(_) => {
result.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
let Value::Gauge(x) = x else {
panic!("metric type changed during bench run")
};
(*x, x.to_string(), Goal::None)
})
.collect(),
});
}
Value::Counter(_) => {
let mut prev = 0.0;
result.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
let Value::Counter(x) = x else {
panic!("metric type changed during bench run")
};
let diff = x - prev;
prev = *x;
(diff, diff.to_string(), Goal::None)
})
.collect(),
});
}
Value::Summary(_) => {
let last = value.last().unwrap();
let Value::Summary(summary) = last else {
panic!("metric type changed during bench run")
};
let values = summary
.iter()
.map(|x| LatencyPercentile {
value: x.count,
value_display: format!("{:.4}ms", x.count * 1000.0),
quantile: x.quantile.to_string(),
})
.collect();
result.push(Metric::LatencyPercentiles { name, values });
}
_ => {
tracing::warn!("Unused shotover metric: {name}")
}
}
}
result.sort_by_key(|x| {
let name = x.name();
// move latency metrics to the top
if name.starts_with("chain_latency") || name.starts_with("transform_latency") {
format!("aaa_{name}")
}
// move failure metrics to the bottom.
else if name.starts_with("transform_failures")
|| name.starts_with("failed_requests")
|| name.starts_with("chain_failures")
{
format!("zzz_{name}")
} else {
name.to_owned()
}
});
result
}

async fn collect_metrics(
mut shutdown_rx: mpsc::Receiver<()>,
url: &str,
) -> Vec<RawPrometheusExposition> {
let mut results = vec![];

let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
_ = interval.tick() => {
match tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)).await.unwrap() {
Ok(response) => {
results.push(RawPrometheusExposition {
timestamp: OffsetDateTime::now_utc(),
content: response.text().await.unwrap(),
});
}
Err(err) => tracing::debug!("Failed to request from metrics endpoint, probably not up yet, error was {err:?}")
}
}
}
}
results
}

pub fn insert_results_to_bench_archive(self) {
std::mem::drop(self.shutdown_tx);
// TODO: make this function + caller async, lets do this in a follow up PR to avoid making this PR even more complex.
futures::executor::block_on(async { self.task.await.unwrap() })
}
}
6 changes: 4 additions & 2 deletions shotover-proxy/benches/windsock/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ impl Bench for RedisBench {
profiler_instances.insert("redis".to_owned(), &instance.instance);
}
}
let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await;

let redis_ip = redis_instances.private_ips()[0].to_string();
let shotover_ip = shotover_instance.instance.private_ip().to_string();

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &shotover_ip)
.await;

let (_, running_shotover) = futures::join!(
redis_instances.run(self.encryption),
self.run_aws_shotover(shotover_instance.clone(), redis_ip.clone())
Expand Down
Loading

0 comments on commit 4f89245

Please sign in to comment.