Skip to content

Commit

Permalink
Add server stats to BenchmarkReport (#1477)
Browse files Browse the repository at this point in the history
This commit updates the `BenchmarkReport` structure to include server
statistics instead of
just the server version. Additionally, the `iggy` dependency has been
added to the
`iggy-benchmark-report` package to support this functionality.
  • Loading branch information
hubcio authored Feb 1, 2025
1 parent f859e17 commit 13c06c5
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 110 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bench"
version = "0.1.4"
version = "0.1.5"
edition = "2021"
license = "Apache-2.0"

Expand Down
2 changes: 1 addition & 1 deletion bench/report/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy-benchmark-report"
version = "0.1.4"
version = "0.1.5"
edition = "2021"
description = "Benchmark report and chart generation library for iggy-bench binary and iggy-benchmarks-dashboard web app"
license = "Apache-2.0"
Expand Down
1 change: 1 addition & 0 deletions bench/report/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ pub mod individual_metrics;
pub mod individual_metrics_summary;
pub mod params;
pub mod report;
pub mod server_stats;
pub mod time_series;
pub mod transport;
13 changes: 7 additions & 6 deletions bench/report/src/types/report.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::server_stats::BenchmarkServerStats;
use crate::group_metrics::BenchmarkGroupMetrics;
use crate::individual_metrics::BenchmarkIndividualMetrics;
use crate::types::hardware::BenchmarkHardware;
Expand All @@ -6,27 +7,27 @@ use serde::{Deserialize, Serialize};
use std::path::Path;
use uuid::Uuid;

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct BenchmarkReport {
/// Benchmark unique identifier
pub uuid: Uuid,

/// Iggy server version
pub server_version: String,

/// Timestamp when the benchmark was finished
pub timestamp: String,

/// Benchmark server statistics
pub server_stats: BenchmarkServerStats,

/// Benchmark hardware
pub hardware: BenchmarkHardware,

/// Benchmark parameters
pub params: BenchmarkParams,

/// Benchmark metrics for all actors of same type
/// Benchmark metrics for all actors of same type (all producers, all consumers or all actors)
pub group_metrics: Vec<BenchmarkGroupMetrics>,

/// Benchmark summaries per actor (producer/consumer)
/// Benchmark metrics per actor (producer/consumer)
pub individual_metrics: Vec<BenchmarkIndividualMetrics>,
}

Expand Down
173 changes: 173 additions & 0 deletions bench/report/src/types/server_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// This file is a big workaround - struct `Stats` exists in `iggy` crate and this crate needs it.
/// However, this crate is being compiled to wasm and `iggy` can't be compiled for this target.
/// To workaround this, we need just maintain a copy of the `Stats` struct in this crate.
///
/// Hopefully, one day we will have a separate crate for iggy models and this file can be removed.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct BenchmarkServerStats {
/// The unique identifier of the process.
pub process_id: u32,
/// The CPU usage of the process.
pub cpu_usage: f32,
/// the total CPU usage of the system.
pub total_cpu_usage: f32,
/// The memory usage of the process.
pub memory_usage: u64,
/// The total memory of the system.
pub total_memory: u64,
/// The available memory of the system.
pub available_memory: u64,
/// The run time of the process.
pub run_time: u64,
/// The start time of the process.
pub start_time: u64,
/// The total number of bytes read.
pub read_bytes: u64,
/// The total number of bytes written.
pub written_bytes: u64,
/// The total size of the messages in bytes.
pub messages_size_bytes: u64,
/// The total number of streams.
pub streams_count: u32,
/// The total number of topics.
pub topics_count: u32,
/// The total number of partitions.
pub partitions_count: u32,
/// The total number of segments.
pub segments_count: u32,
/// The total number of messages.
pub messages_count: u64,
/// The total number of connected clients.
pub clients_count: u32,
/// The total number of consumer groups.
pub consumer_groups_count: u32,
/// The name of the host.
pub hostname: String,
/// The details of the operating system.
pub os_name: String,
/// The version of the operating system.
pub os_version: String,
/// The version of the kernel.
pub kernel_version: String,
/// The version of the Iggy server.
pub iggy_server_version: String,
/// The semantic version of the Iggy server in the numeric format e.g. 1.2.3 -> 100200300 (major * 1000000 + minor * 1000 + patch).
pub iggy_server_semver: Option<u32>,
/// Cache metrics per partition
#[serde(with = "cache_metrics_serializer")]
pub cache_metrics: HashMap<BenchmarkCacheMetricsKey, BenchmarkCacheMetrics>,
}

/// Key for identifying a specific partition's cache metrics
#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq, Clone)]
pub struct BenchmarkCacheMetricsKey {
/// Stream ID
pub stream_id: u32,
/// Topic ID
pub topic_id: u32,
/// Partition ID
pub partition_id: u32,
}

impl BenchmarkCacheMetricsKey {
pub fn to_string_key(&self) -> String {
format!("{}-{}-{}", self.stream_id, self.topic_id, self.partition_id)
}
}

/// Cache metrics for a specific partition
#[derive(Debug, Serialize, Deserialize, Default, Clone, PartialEq)]
pub struct BenchmarkCacheMetrics {
/// Number of cache hits
pub hits: u64,
/// Number of cache misses
pub misses: u64,
/// Hit ratio (hits / (hits + misses))
pub hit_ratio: f32,
}

mod cache_metrics_serializer {
use super::*;
use serde::{Deserialize, Deserializer, Serializer};
use std::collections::HashMap;

pub fn serialize<S>(
metrics: &HashMap<BenchmarkCacheMetricsKey, BenchmarkCacheMetrics>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let string_map: HashMap<String, &BenchmarkCacheMetrics> = metrics
.iter()
.map(|(k, v)| (k.to_string_key(), v))
.collect();
string_map.serialize(serializer)
}

pub fn deserialize<'de, D>(
deserializer: D,
) -> Result<HashMap<BenchmarkCacheMetricsKey, BenchmarkCacheMetrics>, D::Error>
where
D: Deserializer<'de>,
{
let string_map: HashMap<String, BenchmarkCacheMetrics> =
HashMap::deserialize(deserializer)?;
let mut result = HashMap::new();
for (key_str, value) in string_map {
let parts: Vec<&str> = key_str.split('-').collect();
if parts.len() != 3 {
continue;
}
if let (Ok(stream_id), Ok(topic_id), Ok(partition_id)) = (
parts[0].parse::<u32>(),
parts[1].parse::<u32>(),
parts[2].parse::<u32>(),
) {
let key = BenchmarkCacheMetricsKey {
stream_id,
topic_id,
partition_id,
};
result.insert(key, value);
}
}
Ok(result)
}
}

impl Default for BenchmarkServerStats {
fn default() -> Self {
Self {
process_id: 0,
cpu_usage: 0.0,
total_cpu_usage: 0.0,
memory_usage: 0,
total_memory: 0,
available_memory: 0,
run_time: 0,
start_time: 0,
read_bytes: 0,
written_bytes: 0,
messages_size_bytes: 0,
streams_count: 0,
topics_count: 0,
partitions_count: 0,
segments_count: 0,
messages_count: 0,
clients_count: 0,
consumer_groups_count: 0,
hostname: "unknown_hostname".to_string(),
os_name: "unknown_os_name".to_string(),
os_version: "unknown_os_version".to_string(),
kernel_version: "unknown_kernel_version".to_string(),
iggy_server_version: "unknown_iggy_version".to_string(),
iggy_server_semver: None,
cache_metrics: HashMap::new(),
}
}
}
87 changes: 77 additions & 10 deletions bench/src/analytics/report_builder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
use std::collections::HashMap;

use super::metrics::group::{from_individual_metrics, from_producers_and_consumers_statistics};
use crate::utils::server_version::get_server_version;
use crate::utils::get_server_stats;
use chrono::{DateTime, Utc};
use iggy::utils::timestamp::IggyTimestamp;
use iggy::{
models::stats::{CacheMetrics, CacheMetricsKey, Stats},
utils::timestamp::IggyTimestamp,
};
use iggy_benchmark_report::{
actor_kind::ActorKind, benchmark_kind::BenchmarkKind, hardware::BenchmarkHardware,
individual_metrics::BenchmarkIndividualMetrics, params::BenchmarkParams,
actor_kind::ActorKind,
benchmark_kind::BenchmarkKind,
hardware::BenchmarkHardware,
individual_metrics::BenchmarkIndividualMetrics,
params::BenchmarkParams,
report::BenchmarkReport,
server_stats::{BenchmarkCacheMetrics, BenchmarkCacheMetricsKey, BenchmarkServerStats},
};

pub struct BenchmarkReportBuilder;
Expand All @@ -24,13 +33,15 @@ impl BenchmarkReportBuilder {
.map(|dt| dt.to_rfc3339())
.unwrap_or_else(|| String::from("unknown"));

let server_version = match get_server_version(&params).await {
Ok(v) => v,
Err(_) => "unknown".to_string(),
};
let transport = params.transport;
let server_addr = params.server_address.clone();

let server_stats = get_server_stats(&transport, &server_addr)
.await
.expect("Failed to get server stats");

if params.gitref.is_none() {
params.gitref = Some(server_version.clone());
params.gitref = Some(server_stats.iggy_server_version.clone());
};

if params.gitref_date.is_none() {
Expand Down Expand Up @@ -84,7 +95,7 @@ impl BenchmarkReportBuilder {

BenchmarkReport {
uuid,
server_version,
server_stats: stats_to_benchmark_server_stats(server_stats),
timestamp,
hardware,
params,
Expand All @@ -93,3 +104,59 @@ impl BenchmarkReportBuilder {
}
}
}

/// This function is a workaround.
/// See server_stats.rs in `iggy_benchmark_report` crate for more details.
fn stats_to_benchmark_server_stats(stats: Stats) -> BenchmarkServerStats {
BenchmarkServerStats {
process_id: stats.process_id,
cpu_usage: stats.cpu_usage,
total_cpu_usage: stats.total_cpu_usage,
memory_usage: stats.memory_usage.as_bytes_u64(),
total_memory: stats.total_memory.as_bytes_u64(),
available_memory: stats.available_memory.as_bytes_u64(),
run_time: stats.run_time.into(),
start_time: stats.start_time.into(),
read_bytes: stats.read_bytes.as_bytes_u64(),
written_bytes: stats.written_bytes.as_bytes_u64(),
messages_size_bytes: stats.messages_size_bytes.as_bytes_u64(),
streams_count: stats.streams_count,
topics_count: stats.topics_count,
partitions_count: stats.partitions_count,
segments_count: stats.segments_count,
messages_count: stats.messages_count,
clients_count: stats.clients_count,
consumer_groups_count: stats.consumer_groups_count,
hostname: stats.hostname,
os_name: stats.os_name,
os_version: stats.os_version,
kernel_version: stats.kernel_version,
iggy_server_version: stats.iggy_server_version,
iggy_server_semver: stats.iggy_server_semver,
cache_metrics: cache_metrics_to_benchmark_cache_metrics(stats.cache_metrics),
}
}

/// This function is a workaround.
/// See server_stats.rs in `iggy_benchmark_report` crate for more details.
fn cache_metrics_to_benchmark_cache_metrics(
cache_metrics: HashMap<CacheMetricsKey, CacheMetrics>,
) -> HashMap<BenchmarkCacheMetricsKey, BenchmarkCacheMetrics> {
cache_metrics
.into_iter()
.map(|(k, v)| {
(
BenchmarkCacheMetricsKey {
stream_id: k.stream_id,
topic_id: k.topic_id,
partition_id: k.partition_id,
},
BenchmarkCacheMetrics {
hits: v.hits,
misses: v.misses,
hit_ratio: v.hit_ratio,
},
)
})
.collect()
}
Loading

0 comments on commit 13c06c5

Please sign in to comment.