Skip to content

Commit

Permalink
Implement EndToEndProducingConsumerGroup Benchmark (#1491)
Browse files Browse the repository at this point in the history
This commit introduces the `EndToEndProducingConsumerGroup` benchmark,
which
allows for benchmarking scenarios with producing consumers assigned to
multiple
consumer groups. Key changes include:

- Added `EndToEndProducingConsumerGroup` to the `BenchmarkKind` enum.
- Implemented the `ProducingConsumer` actor to handle producing and
consuming
  messages within consumer groups.
- Updated command-line arguments to support the new benchmark type.
- Modified existing benchmarks to accommodate the new consumer group
logic.
- Improved logging and error handling for better traceability during
execution.
  • Loading branch information
hubcio authored Feb 5, 2025
1 parent ab7c371 commit 0e14df4
Show file tree
Hide file tree
Showing 25 changed files with 507 additions and 176 deletions.
21 changes: 14 additions & 7 deletions bench/report/src/prints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@ use human_repr::HumanCount;
use tracing::info;

use crate::{
group_metrics::BenchmarkGroupMetrics, group_metrics_kind::GroupMetricsKind,
report::BenchmarkReport,
benchmark_kind::BenchmarkKind, group_metrics::BenchmarkGroupMetrics,
group_metrics_kind::GroupMetricsKind, report::BenchmarkReport,
};

impl BenchmarkReport {
pub fn print_summary(&self) {
let kind = self.params.benchmark_kind;
let actors = self.params.producers + self.params.consumers;
let total_messages: u64 = self.params.messages_per_batch as u64
let total_messages_sent: u64 = self.params.messages_per_batch as u64
* self.params.message_batches as u64
* actors as u64;
* self.params.producers as u64;
let total_messages_received: u64 = self.params.messages_per_batch as u64
* self.params.message_batches as u64
* self.params.consumers as u64;
let total_messages = total_messages_sent + total_messages_received;
let total_size_bytes: u64 = total_messages * self.params.message_size as u64;
let total_size = format!("{} total size", total_size_bytes.human_count_bytes());
let total_messages = format!("{} total messages, ", total_messages.human_count_bare());
let total_size = format!("{} of data processed", total_size_bytes.human_count_bytes());
let total_messages = format!("{} messages processed, ", total_messages.human_count_bare());

let streams = format!("{} streams, ", self.params.streams);
// TODO: make this configurable
Expand All @@ -29,6 +32,10 @@ impl BenchmarkReport {
);
let producers = if self.params.producers == 0 {
"".to_owned()
} else if self.params.benchmark_kind == BenchmarkKind::EndToEndProducingConsumerGroup
|| self.params.benchmark_kind == BenchmarkKind::EndToEndProducingConsumer
{
format!("{} producing consumers, ", self.params.producers)
} else {
format!("{} producers, ", self.params.producers)
};
Expand Down
6 changes: 3 additions & 3 deletions bench/report/src/types/benchmark_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub enum BenchmarkKind {
#[display("End To End Producing Consumer")]
#[serde(rename = "end_to_end_producing_consumer")]
EndToEndProducingConsumer,
// #[display("End To End Producer And Consumer Group")]
// #[serde(rename = "end_to_end_producer_and_consumer_group")]
// EndToEndProducerAndConsumerGroup,
#[display("End To End Producing Consumer Group")]
#[serde(rename = "end_to_end_producing_consumer_group")]
EndToEndProducingConsumerGroup,
}
13 changes: 7 additions & 6 deletions bench/report/src/types/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ impl BenchmarkParams {
}
BenchmarkKind::EndToEndProducingConsumer => {
format!("{} producing consumers", self.producers)
} // BenchmarkKind::EndToEndProducerAndConsumerGroup => {
// format!(
// "{} producers/{} consumers/{} consumer groups",
// self.producers, self.consumers, self.consumer_groups
// )
// }
}
BenchmarkKind::EndToEndProducingConsumerGroup => {
format!(
"{} producing consumers/{} consumer groups",
self.producers, self.consumer_groups
)
}
}
}
}
2 changes: 1 addition & 1 deletion bench/src/actors/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl Consumer {
Self::log_statistics(
self.consumer_id,
total_messages,
message_batches as u32,
current_iteration as u32,
messages_per_batch,
&metrics,
);
Expand Down
152 changes: 126 additions & 26 deletions bench/src/actors/producing_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use crate::analytics::metrics::individual::from_records;
use crate::analytics::record::BenchmarkRecord;
use crate::rate_limiter::RateLimiter;
use human_repr::HumanCount;
use iggy::client::ConsumerGroupClient;
use iggy::client::MessageClient;
use iggy::clients::client::IggyClient;
use iggy::consumer::Consumer as IggyConsumer;
use iggy::error::IggyError;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;
Expand All @@ -17,6 +18,7 @@ use iggy_bench_report::benchmark_kind::BenchmarkKind;
use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics;
use integration::test_server::{login_root, ClientFactory};
use std::str::FromStr;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
Expand All @@ -26,15 +28,18 @@ pub struct ProducingConsumer {
client_factory: Arc<dyn ClientFactory>,
benchmark_kind: BenchmarkKind,
actor_id: u32,
consumer_group_id: Option<u32>,
stream_id: u32,
partitions_count: u32,
messages_per_batch: u32,
message_batches: u32,
message_size: u32,
batches_left_to_receive: Arc<AtomicI64>,
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
rate_limiter: Option<RateLimiter>,
polling_kind: PollingKind,
calculate_latency_from_timestamp_in_first_message: bool,
}

Expand All @@ -44,30 +49,36 @@ impl ProducingConsumer {
client_factory: Arc<dyn ClientFactory>,
benchmark_kind: BenchmarkKind,
actor_id: u32,
consumer_group_id: Option<u32>,
stream_id: u32,
partitions_count: u32,
messages_per_batch: u32,
message_batches: u32,
message_size: u32,
batches_left_to_receive: Arc<AtomicI64>,
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
rate_limiter: Option<RateLimiter>,
polling_kind: PollingKind,
calculate_latency_from_timestamp_in_first_message: bool,
) -> Self {
Self {
client_factory,
benchmark_kind,
actor_id,
consumer_group_id,
stream_id,
partitions_count,
messages_per_batch,
message_batches,
message_size,
batches_left_to_receive,
warmup_time,
sampling_time,
moving_average_window,
rate_limiter,
polling_kind,
calculate_latency_from_timestamp_in_first_message,
}
}
Expand All @@ -79,7 +90,7 @@ impl ProducingConsumer {
let messages_per_batch = self.messages_per_batch;
let message_size = self.message_size;
let total_messages = (messages_per_batch * message_batches) as u64;

let total_msg_batches = self.batches_left_to_receive.load(Ordering::Acquire);
let client = self.client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
login_root(&client).await;
Expand All @@ -106,40 +117,92 @@ impl ProducingConsumer {
1 => Partitioning::partition_id(default_partition_id),
2.. => Partitioning::balanced(),
};
let partition_id = if self.consumer_group_id.is_some() {
None
} else {
Some(default_partition_id)
};

let consumer = IggyConsumer::new(self.actor_id.try_into().unwrap());
let consumer = match self.consumer_group_id {
Some(consumer_group_id) => {
client
.join_consumer_group(
&stream_id,
&topic_id,
&consumer_group_id.try_into().unwrap(),
)
.await
.expect("Failed to join consumer group");
info!(
"ProducingConsumer #{} → joined consumer group {consumer_group_id}",
self.actor_id,
);
IggyConsumer::group(consumer_group_id.try_into().unwrap())
}
None => IggyConsumer::new(self.actor_id.try_into().unwrap()),
};
let mut current_offset: u64 = 0;
let mut last_warning_time: Option<Instant> = None;
let mut skipped_warnings_count: u32 = 0;

// Warmup if needed
if self.warmup_time.get_duration() != Duration::from_millis(0) {
info!(
"ProducingConsumer #{} → warming up for {}...",
self.actor_id, self.warmup_time
);
if let Some(cg_id) = self.consumer_group_id {
info!(
"ProducingConsumer #{}, part of consumer group #{}, → warming up for {}...",
self.actor_id, cg_id, self.warmup_time
);
} else {
info!(
"ProducingConsumer #{} → warming up for {}...",
self.actor_id, self.warmup_time
);
}
let warmup_end = Instant::now() + self.warmup_time.get_duration();
while Instant::now() < warmup_end {
client
.send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
.await?;

let strategy = PollingStrategy::offset(current_offset);
let (strategy, auto_commit) = match self.polling_kind {
PollingKind::Offset => (PollingStrategy::offset(current_offset), false),
PollingKind::Next => (PollingStrategy::next(), true),
_ => panic!(
"Unsupported polling kind for benchmark: {:?}",
self.polling_kind
),
};
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
Some(default_partition_id),
partition_id,
&consumer,
&strategy,
messages_per_batch,
false,
auto_commit,
)
.await?;

if polled_messages.messages.is_empty() {
warn!(
"ProducingConsumer #{} - Messages are empty for offset: {}, retrying...",
self.actor_id, current_offset
);
let should_warn = last_warning_time
.map(|t| t.elapsed() >= Duration::from_secs(1))
.unwrap_or(true);

if should_warn {
warn!(
"ProducingConsumer #{} → expected {} messages but got {}, retrying... ({} warnings skipped)",
self.actor_id,
messages_per_batch,
polled_messages.messages.len(),
skipped_warnings_count
);
last_warning_time = Some(Instant::now());
skipped_warnings_count = 0;
} else {
skipped_warnings_count += 1;
}

continue;
}
current_offset += messages_per_batch as u64;
Expand All @@ -158,8 +221,9 @@ impl ProducingConsumer {
let start_timestamp = Instant::now();
let mut latencies: Vec<Duration> = Vec::with_capacity(message_batches as usize);
let mut records: Vec<BenchmarkRecord> = Vec::with_capacity(message_batches as usize);
let mut batch_id = 1;

for batch_id in 1..=message_batches {
while self.batches_left_to_receive.load(Ordering::Acquire) > 0 {
if let Some(rate_limiter) = &self.rate_limiter {
rate_limiter.throttle(batch_user_data_bytes).await;
}
Expand All @@ -169,25 +233,49 @@ impl ProducingConsumer {
client
.send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
.await?;
let strategy = PollingStrategy::offset(current_offset);
let (strategy, auto_commit) = match self.polling_kind {
PollingKind::Offset => (PollingStrategy::offset(current_offset), false),
PollingKind::Next => (PollingStrategy::next(), true),
_ => panic!(
"Unsupported polling kind for benchmark: {:?}",
self.polling_kind
),
};
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
Some(default_partition_id),
partition_id,
&consumer,
&strategy,
messages_per_batch,
false,
auto_commit,
)
.await?;
if polled_messages.messages.is_empty() {
warn!(
"ProducingConsumer #{} - Messages are empty for offset: {}, retrying...",
self.actor_id, current_offset
);

if polled_messages.messages.len() != messages_per_batch as usize {
let should_warn = last_warning_time
.map(|t| t.elapsed() >= Duration::from_secs(1))
.unwrap_or(true);

if should_warn {
warn!(
"ProducingConsumer #{} → expected {} messages but got {}, there is {} batches left to receive, retrying... ({} warnings skipped in last second)",
self.actor_id,
messages_per_batch,
polled_messages.messages.len(),
self.batches_left_to_receive.load(Ordering::Acquire),
skipped_warnings_count
);
last_warning_time = Some(Instant::now());
skipped_warnings_count = 0;
} else {
skipped_warnings_count += 1;
}

continue;
}

// Extract send timestamp from first message in batch
let latency = if self.calculate_latency_from_timestamp_in_first_message {
calculate_latency_from_first_message(&polled_messages.messages[0])
Expand All @@ -197,17 +285,29 @@ impl ProducingConsumer {
latencies.push(latency);

current_offset += messages_per_batch as u64;
self.batches_left_to_receive.fetch_sub(1, Ordering::AcqRel);
batch_id += 1;

records.push(BenchmarkRecord {
elapsed_time_us: start_timestamp.elapsed().as_micros() as u64,
latency_us: latency.as_micros() as u64,
messages: (batch_id * messages_per_batch) as u64,
messages: (batch_id * messages_per_batch * 2) as u64, // Count both sent and polled messages
message_batches: batch_id as u64,
user_data_bytes: batch_id as u64 * batch_user_data_bytes,
total_bytes: batch_id as u64 * batch_total_bytes,
user_data_bytes: batch_id as u64 * batch_user_data_bytes * 2, // Account for both sent and received bytes
total_bytes: batch_id as u64 * batch_total_bytes * 2, // Account for both sent and received total bytes
});
}

info!(
"ProducingConsumer #{} → sent and received {} messages ({} sent + {} received) in {} batches out of {}",
self.actor_id,
batch_id * messages_per_batch * 2,
batch_id * messages_per_batch,
batch_id * messages_per_batch,
batch_id,
total_msg_batches
);

let metrics = from_records(
records,
self.benchmark_kind,
Expand Down
Loading

0 comments on commit 0e14df4

Please sign in to comment.