diff --git a/Cargo.lock b/Cargo.lock index 08559d668..74770d5d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -676,7 +676,7 @@ dependencies = [ [[package]] name = "bench" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "clap", diff --git a/README.md b/README.md index b8542c577..513d87b6d 100644 --- a/README.md +++ b/README.md @@ -247,6 +247,16 @@ Then, run the benchmarking app with the desired options: cargo r --bin iggy-bench -r -- -c -v send-and-poll tcp ``` +4. Polling with consumer group + + ```bash + cargo r --bin iggy-bench -r -- -c -v send --streams 1 --partitions 10 --disable-parallel-producers tcp + ``` + + ```bash + cargo r --bin iggy-bench -r -- -c -v consumer-group-poll tcp + ``` + These benchmarks would start the server with the default configuration, create a stream, topic and partition, and then send or poll the messages. The default configuration is optimized for the best performance, so you might want to tweak it for your needs. If you need more options, please refer to `iggy-bench` subcommands `help` and `examples`. For example, to run the benchmark for the already started server, provide the additional argument `--server-address 0.0.0.0:8090`. diff --git a/bench/Cargo.toml b/bench/Cargo.toml index beebe3a4d..e41859ba8 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bench" -version = "0.1.0" +version = "0.1.1" edition = "2021" [dependencies] diff --git a/bench/src/args/common.rs b/bench/src/args/common.rs index c458c8a1c..d62ad980d 100644 --- a/bench/src/args/common.rs +++ b/bench/src/args/common.rs @@ -102,6 +102,10 @@ impl IggyBenchArgs { self.benchmark_kind.inner().number_of_streams() } + pub fn number_of_partitions(&self) -> u32 { + self.benchmark_kind.inner().number_of_partitions() + } + pub fn consumers(&self) -> u32 { self.benchmark_kind.inner().consumers() } @@ -122,6 +126,10 @@ impl IggyBenchArgs { .disable_parallel_consumer_streams() } + pub fn number_of_consumer_groups(&self) -> u32 { + self.benchmark_kind.inner().number_of_consumer_groups() + } + pub fn warmup_time(&self) -> IggyDuration { self.warmup_time } diff --git a/bench/src/args/defaults.rs b/bench/src/args/defaults.rs index 126de0444..6489ca6ef 100644 --- a/bench/src/args/defaults.rs +++ b/bench/src/args/defaults.rs @@ -21,7 +21,11 @@ pub const DEFAULT_MESSAGE_BATCHES: NonZeroU32 = u32!(1000); pub const DEFAULT_MESSAGE_SIZE: NonZeroU32 = u32!(1000); pub const DEFAULT_NUMBER_OF_STREAMS: NonZeroU32 = u32!(10); +pub const DEFAULT_NUMBER_OF_STREAMS_CONSUMER_GROUP: NonZeroU32 = u32!(1); +pub const DEFAULT_NUMBER_OF_PARTITIONS: NonZeroU32 = u32!(1); + pub const DEFAULT_NUMBER_OF_CONSUMERS: NonZeroU32 = u32!(10); +pub const DEFAULT_NUMBER_OF_CONSUMER_GROUPS: NonZeroU32 = u32!(1); pub const DEFAULT_NUMBER_OF_PRODUCERS: NonZeroU32 = u32!(10); pub const DEFAULT_PERFORM_CLEANUP: bool = false; diff --git a/bench/src/args/kind.rs b/bench/src/args/kind.rs index 382c00bd5..48e29b947 100644 --- a/bench/src/args/kind.rs +++ b/bench/src/args/kind.rs @@ -4,6 +4,7 @@ use super::props::BenchmarkKindProps; use super::transport::BenchmarkTransportCommand; use super::{common::IggyBenchArgs, simple::BenchmarkKind}; use clap::{error::ErrorKind, CommandFactory, Parser, Subcommand}; +use core::panic; use std::num::NonZeroU32; #[derive(Subcommand, Debug)] @@ -11,6 +12,7 @@ pub enum BenchmarkKindCommand { Send(SendArgs), Poll(PollArgs), SendAndPoll(SendAndPollArgs), + ConsumerGroupPoll(ConsumerGroupArgs), /// Prints examples Examples, @@ -22,6 +24,7 @@ impl BenchmarkKindCommand { BenchmarkKindCommand::Send(_) => BenchmarkKind::Send, BenchmarkKindCommand::Poll(_) => BenchmarkKind::Poll, BenchmarkKindCommand::SendAndPoll(_) => BenchmarkKind::SendAndPoll, + BenchmarkKindCommand::ConsumerGroupPoll(_) => BenchmarkKind::ConsumerGroupPoll, BenchmarkKindCommand::Examples => { print_examples(); std::process::exit(0); @@ -47,6 +50,10 @@ impl BenchmarkKindProps for BenchmarkKindCommand { self.inner().number_of_streams() } + fn number_of_partitions(&self) -> u32 { + self.inner().number_of_partitions() + } + fn consumers(&self) -> u32 { self.inner().consumers() } @@ -67,11 +74,16 @@ impl BenchmarkKindProps for BenchmarkKindCommand { self.inner().transport_command() } + fn number_of_consumer_groups(&self) -> u32 { + self.inner().number_of_consumer_groups() + } + fn inner(&self) -> &dyn BenchmarkKindProps { match self { BenchmarkKindCommand::Send(args) => args, BenchmarkKindCommand::Poll(args) => args, BenchmarkKindCommand::SendAndPoll(args) => args, + BenchmarkKindCommand::ConsumerGroupPoll(args) => args, BenchmarkKindCommand::Examples => { print_examples(); std::process::exit(0); @@ -110,6 +122,10 @@ pub struct SendArgs { #[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)] pub streams: NonZeroU32, + /// Number of partitions + #[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)] + pub partitions: NonZeroU32, + /// Flag, disables parallel producers #[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS)] pub disable_parallel_producers: bool, @@ -132,6 +148,10 @@ impl BenchmarkKindProps for SendArgs { self.streams.get() } + fn number_of_partitions(&self) -> u32 { + self.partitions.get() + } + fn consumers(&self) -> u32 { panic!("") } @@ -152,21 +172,123 @@ impl BenchmarkKindProps for SendArgs { &self.transport } + fn number_of_consumer_groups(&self) -> u32 { + panic!("No consumer groups in send benchmark"); + } + fn validate(&self) { let streams = self.streams.get(); let producers = self.producers.get(); let mut cmd = IggyBenchArgs::command(); - if self.disable_parallel_producers && streams < producers { + if !self.disable_parallel_producers && streams < producers { cmd.error( ErrorKind::ArgumentConflict, - format!("With parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).", + format!("Without parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).", )) .exit(); } } } +/// Sending and Polling benchmark with consumer group +#[derive(Parser, Debug)] +pub struct ConsumerGroupArgs { + #[command(subcommand)] + pub transport: BenchmarkTransportCommand, + + /// Number of messages per batch + #[arg(long, default_value_t = DEFAULT_MESSAGES_PER_BATCH)] + pub messages_per_batch: NonZeroU32, + + /// Number of message batches + #[arg(long, default_value_t = DEFAULT_MESSAGE_BATCHES)] + pub message_batches: NonZeroU32, + + /// Message size in bytes + #[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)] + pub message_size: NonZeroU32, + + /// Number of streams + #[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS_CONSUMER_GROUP)] + pub streams: NonZeroU32, + + /// Number of consumers + #[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMERS)] + pub consumers: NonZeroU32, + + /// Number of consumers + #[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMER_GROUPS)] + pub consumer_groups: NonZeroU32, +} + +impl BenchmarkKindProps for ConsumerGroupArgs { + fn message_size(&self) -> u32 { + self.message_size.get() + } + + fn messages_per_batch(&self) -> u32 { + self.messages_per_batch.get() + } + + fn message_batches(&self) -> u32 { + self.message_batches.get() + } + + fn number_of_streams(&self) -> u32 { + self.streams.get() + } + + fn number_of_partitions(&self) -> u32 { + panic!("No partitions in consumer group benchmark"); + } + + fn consumers(&self) -> u32 { + self.consumers.get() + } + + fn producers(&self) -> u32 { + panic!("No producers in consumer group"); + } + + fn disable_parallel_producer_streams(&self) -> bool { + panic!("No parallel producer for consumer group"); + } + + fn disable_parallel_consumer_streams(&self) -> bool { + panic!("No parallel consumer for consumer group"); + } + + fn transport_command(&self) -> &BenchmarkTransportCommand { + &self.transport + } + + fn number_of_consumer_groups(&self) -> u32 { + self.consumer_groups.get() + } + + fn validate(&self) { + let cg_number = self.consumer_groups.get(); + let consumers_number = self.consumers.get(); + let mut cmd = IggyBenchArgs::command(); + if cg_number < 1 { + cmd.error( + ErrorKind::ArgumentConflict, + "Consumer groups number must be greater than 0 for a consumer groups benchmark.", + ) + .exit(); + } + + if consumers_number < 1 { + cmd.error( + ErrorKind::ArgumentConflict, + "Consumers number must be greater than 0 for a consumer groups benchmark.", + ) + .exit(); + } + } +} + /// Polling (reading) benchmark #[derive(Parser, Debug)] pub struct PollArgs { @@ -193,6 +315,10 @@ pub struct PollArgs { #[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)] pub streams: NonZeroU32, + /// Number of streams + #[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)] + pub partitions: NonZeroU32, + /// Flag, disables parallel consumers #[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_CONSUMER_STREAMS)] pub disable_parallel_consumers: bool, @@ -215,6 +341,10 @@ impl BenchmarkKindProps for PollArgs { self.streams.get() } + fn number_of_partitions(&self) -> u32 { + self.partitions.get() + } + fn consumers(&self) -> u32 { self.consumers.get() } @@ -235,15 +365,19 @@ impl BenchmarkKindProps for PollArgs { &self.transport } + fn number_of_consumer_groups(&self) -> u32 { + panic!("No consumer groups in poll benchmark"); + } + fn validate(&self) { let streams = self.streams.get(); let consumers = self.consumers.get(); let mut cmd = IggyBenchArgs::command(); - if self.disable_parallel_consumers && streams < consumers { + if !self.disable_parallel_consumers && streams < consumers { cmd.error( ErrorKind::ArgumentConflict, - format!("With parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."), + format!("Without parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."), ) .exit(); } @@ -280,6 +414,10 @@ pub struct SendAndPollArgs { #[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)] pub streams: NonZeroU32, + /// Number of partitions + #[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)] + pub partitions: NonZeroU32, + /// Flag, disables parallel producers #[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS)] pub disable_parallel_producers: bool, @@ -298,6 +436,10 @@ impl BenchmarkKindProps for SendAndPollArgs { self.streams.get() } + fn number_of_partitions(&self) -> u32 { + self.partitions.get() + } + fn message_batches(&self) -> u32 { self.message_batches.get() } @@ -326,24 +468,28 @@ impl BenchmarkKindProps for SendAndPollArgs { &self.transport } + fn number_of_consumer_groups(&self) -> u32 { + panic!("No consumer groups in send and poll benchmark"); + } + fn validate(&self) { let streams = self.streams.get(); let consumers = self.consumers.get(); let producers = self.producers.get(); let mut cmd = IggyBenchArgs::command(); - if self.disable_parallel_consumers && streams < consumers { + if !self.disable_parallel_consumers && streams < consumers { cmd.error( ErrorKind::ArgumentConflict, - format!("With parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."), + format!("Without parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."), ) .exit(); } - if self.disable_parallel_producers && streams < producers { + if !self.disable_parallel_producers && streams < producers { cmd.error( ErrorKind::ArgumentConflict, - format!("With parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).", + format!("Without parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).", )) .exit(); } diff --git a/bench/src/args/props.rs b/bench/src/args/props.rs index add8fd710..eb563f6a4 100644 --- a/bench/src/args/props.rs +++ b/bench/src/args/props.rs @@ -6,6 +6,8 @@ pub trait BenchmarkKindProps { fn messages_per_batch(&self) -> u32; fn message_batches(&self) -> u32; fn number_of_streams(&self) -> u32; + fn number_of_partitions(&self) -> u32; + fn number_of_consumer_groups(&self) -> u32; fn consumers(&self) -> u32; fn producers(&self) -> u32; fn disable_parallel_producer_streams(&self) -> bool; diff --git a/bench/src/args/simple.rs b/bench/src/args/simple.rs index 875034512..2a46ec300 100644 --- a/bench/src/args/simple.rs +++ b/bench/src/args/simple.rs @@ -8,4 +8,6 @@ pub enum BenchmarkKind { Poll, #[display(fmt = "send and poll messages")] SendAndPoll, + #[display(fmt = "consumer group poll")] + ConsumerGroupPoll, } diff --git a/bench/src/benchmarks/benchmark.rs b/bench/src/benchmarks/benchmark.rs index 7fe250f2f..fd4e0ca0a 100644 --- a/bench/src/benchmarks/benchmark.rs +++ b/bench/src/benchmarks/benchmark.rs @@ -1,6 +1,6 @@ use super::{ - poll_benchmark::PollMessagesBenchmark, send_and_poll_benchmark::SendAndPollMessagesBenchmark, - send_benchmark::SendMessagesBenchmark, + consumer_group_benchmark::ConsumerGroupBenchmark, poll_benchmark::PollMessagesBenchmark, + send_and_poll_benchmark::SendAndPollMessagesBenchmark, send_benchmark::SendMessagesBenchmark, }; use crate::{ args::{common::IggyBenchArgs, simple::BenchmarkKind}, @@ -35,6 +35,9 @@ impl From for Box { BenchmarkKind::Send => { Box::new(SendMessagesBenchmark::new(Arc::new(args), client_factory)) } + BenchmarkKind::ConsumerGroupPoll => { + Box::new(ConsumerGroupBenchmark::new(Arc::new(args), client_factory)) + } BenchmarkKind::SendAndPoll => Box::new(SendAndPollMessagesBenchmark::new( Arc::new(args), client_factory, @@ -59,7 +62,7 @@ pub trait Benchmarkable { let start_stream_id = self.args().start_stream_id(); let number_of_streams = self.args().number_of_streams(); let topic_id: u32 = 1; - let partitions_count: u32 = 1; + let partitions_count: u32 = self.args().number_of_partitions(); let client = self.client_factory().create_client().await; let client = IggyClient::create( client, @@ -91,7 +94,7 @@ pub trait Benchmarkable { None, None, IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, + MaxTopicSize::Unlimited, ) .await?; } diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs new file mode 100644 index 000000000..51e3e8093 --- /dev/null +++ b/bench/src/benchmarks/consumer_group_benchmark.rs @@ -0,0 +1,145 @@ +use crate::{ + args::{common::IggyBenchArgs, simple::BenchmarkKind}, + benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX}, + consumer::Consumer, +}; +use async_trait::async_trait; +use iggy::{ + client::ConsumerGroupClient, + clients::client::{IggyClient, IggyClientBackgroundConfig}, + error::IggyError, + utils::byte_size::IggyByteSize, +}; +use integration::test_server::{login_root, ClientFactory}; +use std::sync::Arc; +use tracing::{error, info}; + +use super::benchmark::{BenchmarkFutures, Benchmarkable}; + +pub struct ConsumerGroupBenchmark { + args: Arc, + client_factory: Arc, +} + +impl ConsumerGroupBenchmark { + pub fn new(args: Arc, client_factory: Arc) -> Self { + Self { + args, + client_factory, + } + } + + pub async fn init_consumer_groups(&self, consumer_groups_count: u32) -> Result<(), IggyError> { + let start_stream_id = self.args().start_stream_id(); + let topic_id: u32 = 1; + let client = self.client_factory().create_client().await; + let client = IggyClient::create( + client, + IggyClientBackgroundConfig::default(), + None, + None, + None, + ); + login_root(&client).await; + for i in 1..=consumer_groups_count { + let consumer_group_id = CONSUMER_GROUP_BASE_ID + i; + let stream_id = start_stream_id + i; + let consumer_group_name = + format!("{}-{}", CONSUMER_GROUP_NAME_PREFIX, consumer_group_id); + info!( + "Creating test consumer group with name: {}, id: {}, stream id: {}, topic id: {}", + consumer_group_name, consumer_group_id, stream_id, topic_id + ); + + let cg = client + .create_consumer_group( + &stream_id.try_into().unwrap(), + &topic_id.try_into().unwrap(), + &consumer_group_name, + Some(consumer_group_id), + ) + .await; + if cg.is_err() { + let error = cg.err().unwrap(); + match error { + IggyError::ConsumerGroupIdAlreadyExists(_, _) => { + continue; + } + _ => error!("Error when creating consumer group : {error}"), + } + } + } + + Ok(()) + } +} + +#[async_trait] +impl Benchmarkable for ConsumerGroupBenchmark { + async fn run(&mut self) -> BenchmarkFutures { + self.check_streams().await?; + let consumer_groups_count = self.args.number_of_consumer_groups(); + self.init_consumer_groups(consumer_groups_count) + .await + .expect("Failed to init consumer group"); + + let start_stream_id = self.args.start_stream_id(); + let start_consumer_group_id = CONSUMER_GROUP_BASE_ID; + let consumers = self.args.consumers(); + let messages_per_batch = self.args.messages_per_batch(); + let message_batches = self.args.message_batches(); + let warmup_time = self.args.warmup_time(); + let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize)); + + for consumer_id in 1..=consumers { + let consumer_group_id = + start_consumer_group_id + 1 + (consumer_id % consumer_groups_count); + let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count); + let consumer = Consumer::new( + self.client_factory.clone(), + consumer_id, + Some(consumer_group_id), + stream_id, + messages_per_batch, + message_batches, + warmup_time, + ); + let future = Box::pin(async move { consumer.run().await }); + futures.as_mut().unwrap().push(future); + } + info!( + "Starting consumer group benchmark with {} messages", + self.total_messages() + ); + futures + } + + fn kind(&self) -> BenchmarkKind { + BenchmarkKind::ConsumerGroupPoll + } + + fn args(&self) -> &IggyBenchArgs { + &self.args + } + + fn client_factory(&self) -> &Arc { + &self.client_factory + } + + fn display_settings(&self) { + let total_messages = self.total_messages(); + let total_size_bytes = total_messages * self.args().message_size() as u64; + // TODO(numinex) - add more details about consumer groups. + info!( + "\x1B[32mBenchmark: {}, total messages: {}, processed: {}, {} streams, {} messages per batch, {} batches, {} bytes per message, {} consumers\x1B[0m", + self.kind(), + total_messages, + IggyByteSize::from(total_size_bytes), + self.args().number_of_streams(), + self.args().messages_per_batch(), + self.args().message_batches(), + self.args().message_size(), + self.args().consumers(), + ); + } +} diff --git a/bench/src/benchmarks/mod.rs b/bench/src/benchmarks/mod.rs index 72a5b6fda..0034f84bb 100644 --- a/bench/src/benchmarks/mod.rs +++ b/bench/src/benchmarks/mod.rs @@ -1,4 +1,8 @@ pub mod benchmark; +pub mod consumer_group_benchmark; pub mod poll_benchmark; pub mod send_and_poll_benchmark; pub mod send_benchmark; + +pub const CONSUMER_GROUP_BASE_ID: u32 = 0; +pub const CONSUMER_GROUP_NAME_PREFIX: &str = "cg"; diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs index 3f8ec0941..da48b837b 100644 --- a/bench/src/benchmarks/poll_benchmark.rs +++ b/bench/src/benchmarks/poll_benchmark.rs @@ -49,6 +49,7 @@ impl Benchmarkable for PollMessagesBenchmark { let consumer = Consumer::new( client_factory, client_id, + None, stream_id, messages_per_batch, message_batches, diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs index e76113c2a..eb4fb9bd6 100644 --- a/bench/src/benchmarks/send_and_poll_benchmark.rs +++ b/bench/src/benchmarks/send_and_poll_benchmark.rs @@ -60,6 +60,7 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { let messages_per_batch = self.args.messages_per_batch(); let message_batches = self.args.message_batches(); let message_size = self.args.message_size(); + let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((producers + consumers) as usize)); @@ -73,6 +74,7 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { self.client_factory.clone(), producer_id, stream_id, + partitions_count, messages_per_batch, message_batches, message_size, @@ -90,6 +92,7 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { let consumer = Consumer::new( self.client_factory.clone(), consumer_id, + None, stream_id, messages_per_batch, message_batches, diff --git a/bench/src/benchmarks/send_benchmark.rs b/bench/src/benchmarks/send_benchmark.rs index 41ad19ac6..de29fc875 100644 --- a/bench/src/benchmarks/send_benchmark.rs +++ b/bench/src/benchmarks/send_benchmark.rs @@ -27,9 +27,11 @@ impl Benchmarkable for SendMessagesBenchmark { self.init_streams().await.expect("Failed to init streams!"); let clients_count = self.args.producers(); info!("Creating {} client(s)...", clients_count); + let streams_number = self.args.number_of_streams(); let messages_per_batch = self.args.messages_per_batch(); let message_batches = self.args.message_batches(); let message_size = self.args.message_size(); + let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize)); @@ -44,13 +46,14 @@ impl Benchmarkable for SendMessagesBenchmark { let parallel_producer_streams = !args.disable_parallel_producer_streams(); let stream_id = match parallel_producer_streams { true => start_stream_id + client_id, - false => start_stream_id + 1, + false => start_stream_id + 1 + (client_id % streams_number), }; let producer = Producer::new( client_factory, client_id, stream_id, + partitions_count, messages_per_batch, message_batches, message_size, diff --git a/bench/src/consumer.rs b/bench/src/consumer.rs index 8072b4ff3..84ea11762 100644 --- a/bench/src/consumer.rs +++ b/bench/src/consumer.rs @@ -1,6 +1,6 @@ use crate::args::simple::BenchmarkKind; use crate::benchmark_result::BenchmarkResult; -use iggy::client::MessageClient; +use iggy::client::{ConsumerGroupClient, MessageClient}; use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig}; use iggy::consumer::Consumer as IggyConsumer; use iggy::error::IggyError; @@ -15,6 +15,7 @@ use tracing::{error, info, warn}; pub struct Consumer { client_factory: Arc, consumer_id: u32, + consumer_group_id: Option, stream_id: u32, messages_per_batch: u32, message_batches: u32, @@ -25,6 +26,7 @@ impl Consumer { pub fn new( client_factory: Arc, consumer_id: u32, + consumer_group_id: Option, stream_id: u32, messages_per_batch: u32, message_batches: u32, @@ -33,6 +35,7 @@ impl Consumer { Self { client_factory, consumer_id, + consumer_group_id, stream_id, messages_per_batch, message_batches, @@ -42,7 +45,7 @@ impl Consumer { pub async fn run(&self) -> Result { let topic_id: u32 = 1; - let partition_id: u32 = 1; + let default_partition_id: u32 = 1; let total_messages = (self.messages_per_batch * self.message_batches) as u64; let client = self.client_factory.create_client().await; let client = IggyClient::create( @@ -53,10 +56,27 @@ impl Consumer { None, ); login_root(&client).await; - let consumer = IggyConsumer::new(self.consumer_id.try_into().unwrap()); let stream_id = self.stream_id.try_into().unwrap(); let topic_id = topic_id.try_into().unwrap(); - let partition_id = Some(partition_id); + let partition_id = if self.consumer_group_id.is_some() { + None + } else { + Some(default_partition_id) + }; + let consumer = match self.consumer_group_id { + Some(consumer_group_id) => { + client + .join_consumer_group( + &stream_id, + &topic_id, + &consumer_group_id.try_into().unwrap(), + ) + .await + .expect("Failed to join consumer group"); + IggyConsumer::group(consumer_group_id.try_into().unwrap()) + } + None => IggyConsumer::new(self.consumer_id.try_into().unwrap()), + }; let mut latencies: Vec = Vec::with_capacity(self.message_batches as usize); let mut total_size_bytes = 0; @@ -66,15 +86,22 @@ impl Consumer { let mut strategy = PollingStrategy::offset(0); if self.warmup_time.get_duration() != Duration::from_millis(0) { - info!( - "Consumer #{} → warming up for {}...", - self.consumer_id, self.warmup_time - ); + if let Some(cg_id) = self.consumer_group_id { + info!( + "Consumer #{}, part of consumer group #{}, → warming up for {}...", + self.consumer_id, cg_id, self.warmup_time + ); + } else { + info!( + "Consumer #{} → warming up for {}...", + self.consumer_id, self.warmup_time + ); + } let warmup_end = Instant::now() + self.warmup_time.get_duration(); while Instant::now() < warmup_end { let offset = current_iteration * self.messages_per_batch as u64; strategy.set_value(offset); - client + let polled_messages = client .poll_messages( &stream_id, &topic_id, @@ -85,14 +112,29 @@ impl Consumer { false, ) .await?; + + if polled_messages.messages.is_empty() { + warn!( + "Consumer: {} - Messages are empty for offset: {}, retrying...", + self.consumer_id, offset + ); + continue; + } current_iteration += 1; } } - info!( - "Consumer #{} → polling {} messages in {} batches of {} messages...", - self.consumer_id, total_messages, self.message_batches, self.messages_per_batch + if let Some(cg_id) = self.consumer_group_id { + info!( + "Consumer #{}, part of consumer group #{} → polling {} messages in {} batches of {} messages...", + self.consumer_id, cg_id, total_messages, self.message_batches, self.messages_per_batch ); + } else { + info!( + "Consumer #{} → polling {} messages in {} batches of {} messages...", + self.consumer_id, total_messages, self.message_batches, self.messages_per_batch + ); + } current_iteration = 0; let start_timestamp = Instant::now(); @@ -131,7 +173,10 @@ impl Consumer { let polled_messages = polled_messages.unwrap(); if polled_messages.messages.is_empty() { - warn!("Messages are empty for offset: {}, retrying...", offset); + warn!( + "Consumer: {} - Messages are empty for offset: {}, retrying...", + self.consumer_id, offset + ); continue; } diff --git a/bench/src/producer.rs b/bench/src/producer.rs index 5c1e8dd79..613667cf3 100644 --- a/bench/src/producer.rs +++ b/bench/src/producer.rs @@ -16,6 +16,7 @@ pub struct Producer { client_factory: Arc, producer_id: u32, stream_id: u32, + partitions_count: u32, messages_per_batch: u32, message_batches: u32, message_size: u32, @@ -23,10 +24,12 @@ pub struct Producer { } impl Producer { + #[allow(clippy::too_many_arguments)] pub fn new( client_factory: Arc, producer_id: u32, stream_id: u32, + partitions_count: u32, messages_per_batch: u32, message_batches: u32, message_size: u32, @@ -36,6 +39,7 @@ impl Producer { client_factory, producer_id, stream_id, + partitions_count, messages_per_batch, message_batches, message_size, @@ -45,7 +49,7 @@ impl Producer { pub async fn run(&self) -> Result { let topic_id: u32 = 1; - let partition_id: u32 = 1; + let default_partition_id: u32 = 1; let total_messages = (self.messages_per_batch * self.message_batches) as u64; let client = self.client_factory.create_client().await; let client = IggyClient::create( @@ -69,8 +73,11 @@ impl Producer { let stream_id = self.stream_id.try_into()?; let topic_id = topic_id.try_into()?; - let partitioning = Partitioning::partition_id(partition_id); - + let partitioning = match self.partitions_count { + 0 => panic!("Partition count must be greater than 0"), + 1 => Partitioning::partition_id(default_partition_id), + 2.. => Partitioning::balanced(), + }; info!( "Producer #{} → warming up for {}...", self.producer_id, self.warmup_time