Skip to content

Commit

Permalink
Extend benchmark with consumer group (#1058)
Browse files Browse the repository at this point in the history
Extended our benchmark suite with a consumer group poll
  • Loading branch information
numinnex authored Jul 14, 2024
1 parent c45cbbd commit 52ac78c
Show file tree
Hide file tree
Showing 16 changed files with 414 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ Then, run the benchmarking app with the desired options:
cargo r --bin iggy-bench -r -- -c -v send-and-poll tcp
```

4. Polling with consumer group

```bash
cargo r --bin iggy-bench -r -- -c -v send --streams 1 --partitions 10 --disable-parallel-producers tcp
```

```bash
cargo r --bin iggy-bench -r -- -c -v consumer-group-poll tcp
```

These benchmarks would start the server with the default configuration, create a stream, topic and partition, and then send or poll the messages. The default configuration is optimized for the best performance, so you might want to tweak it for your needs. If you need more options, please refer to `iggy-bench` subcommands `help` and `examples`.
For example, to run the benchmark for the already started server, provide the additional argument `--server-address 0.0.0.0:8090`.

Expand Down
2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bench"
version = "0.1.0"
version = "0.1.1"
edition = "2021"

[dependencies]
Expand Down
8 changes: 8 additions & 0 deletions bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ impl IggyBenchArgs {
self.benchmark_kind.inner().number_of_streams()
}

pub fn number_of_partitions(&self) -> u32 {
self.benchmark_kind.inner().number_of_partitions()
}

pub fn consumers(&self) -> u32 {
self.benchmark_kind.inner().consumers()
}
Expand All @@ -122,6 +126,10 @@ impl IggyBenchArgs {
.disable_parallel_consumer_streams()
}

pub fn number_of_consumer_groups(&self) -> u32 {
self.benchmark_kind.inner().number_of_consumer_groups()
}

pub fn warmup_time(&self) -> IggyDuration {
self.warmup_time
}
Expand Down
4 changes: 4 additions & 0 deletions bench/src/args/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ pub const DEFAULT_MESSAGE_BATCHES: NonZeroU32 = u32!(1000);
pub const DEFAULT_MESSAGE_SIZE: NonZeroU32 = u32!(1000);

pub const DEFAULT_NUMBER_OF_STREAMS: NonZeroU32 = u32!(10);
pub const DEFAULT_NUMBER_OF_STREAMS_CONSUMER_GROUP: NonZeroU32 = u32!(1);
pub const DEFAULT_NUMBER_OF_PARTITIONS: NonZeroU32 = u32!(1);

pub const DEFAULT_NUMBER_OF_CONSUMERS: NonZeroU32 = u32!(10);
pub const DEFAULT_NUMBER_OF_CONSUMER_GROUPS: NonZeroU32 = u32!(1);
pub const DEFAULT_NUMBER_OF_PRODUCERS: NonZeroU32 = u32!(10);

pub const DEFAULT_PERFORM_CLEANUP: bool = false;
Expand Down
162 changes: 154 additions & 8 deletions bench/src/args/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use super::props::BenchmarkKindProps;
use super::transport::BenchmarkTransportCommand;
use super::{common::IggyBenchArgs, simple::BenchmarkKind};
use clap::{error::ErrorKind, CommandFactory, Parser, Subcommand};
use core::panic;
use std::num::NonZeroU32;

#[derive(Subcommand, Debug)]
pub enum BenchmarkKindCommand {
Send(SendArgs),
Poll(PollArgs),
SendAndPoll(SendAndPollArgs),
ConsumerGroupPoll(ConsumerGroupArgs),

/// Prints examples
Examples,
Expand All @@ -22,6 +24,7 @@ impl BenchmarkKindCommand {
BenchmarkKindCommand::Send(_) => BenchmarkKind::Send,
BenchmarkKindCommand::Poll(_) => BenchmarkKind::Poll,
BenchmarkKindCommand::SendAndPoll(_) => BenchmarkKind::SendAndPoll,
BenchmarkKindCommand::ConsumerGroupPoll(_) => BenchmarkKind::ConsumerGroupPoll,
BenchmarkKindCommand::Examples => {
print_examples();
std::process::exit(0);
Expand All @@ -47,6 +50,10 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
self.inner().number_of_streams()
}

fn number_of_partitions(&self) -> u32 {
self.inner().number_of_partitions()
}

fn consumers(&self) -> u32 {
self.inner().consumers()
}
Expand All @@ -67,11 +74,16 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
self.inner().transport_command()
}

fn number_of_consumer_groups(&self) -> u32 {
self.inner().number_of_consumer_groups()
}

fn inner(&self) -> &dyn BenchmarkKindProps {
match self {
BenchmarkKindCommand::Send(args) => args,
BenchmarkKindCommand::Poll(args) => args,
BenchmarkKindCommand::SendAndPoll(args) => args,
BenchmarkKindCommand::ConsumerGroupPoll(args) => args,
BenchmarkKindCommand::Examples => {
print_examples();
std::process::exit(0);
Expand Down Expand Up @@ -110,6 +122,10 @@ pub struct SendArgs {
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)]
pub streams: NonZeroU32,

/// Number of partitions
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)]
pub partitions: NonZeroU32,

/// Flag, disables parallel producers
#[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS)]
pub disable_parallel_producers: bool,
Expand All @@ -132,6 +148,10 @@ impl BenchmarkKindProps for SendArgs {
self.streams.get()
}

fn number_of_partitions(&self) -> u32 {
self.partitions.get()
}

fn consumers(&self) -> u32 {
panic!("")
}
Expand All @@ -152,21 +172,123 @@ impl BenchmarkKindProps for SendArgs {
&self.transport
}

fn number_of_consumer_groups(&self) -> u32 {
panic!("No consumer groups in send benchmark");
}

fn validate(&self) {
let streams = self.streams.get();
let producers = self.producers.get();
let mut cmd = IggyBenchArgs::command();

if self.disable_parallel_producers && streams < producers {
if !self.disable_parallel_producers && streams < producers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("With parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).",
format!("Without parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).",
))
.exit();
}
}
}

/// Sending and Polling benchmark with consumer group
#[derive(Parser, Debug)]
pub struct ConsumerGroupArgs {
#[command(subcommand)]
pub transport: BenchmarkTransportCommand,

/// Number of messages per batch
#[arg(long, default_value_t = DEFAULT_MESSAGES_PER_BATCH)]
pub messages_per_batch: NonZeroU32,

/// Number of message batches
#[arg(long, default_value_t = DEFAULT_MESSAGE_BATCHES)]
pub message_batches: NonZeroU32,

/// Message size in bytes
#[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)]
pub message_size: NonZeroU32,

/// Number of streams
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS_CONSUMER_GROUP)]
pub streams: NonZeroU32,

/// Number of consumers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMERS)]
pub consumers: NonZeroU32,

/// Number of consumers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMER_GROUPS)]
pub consumer_groups: NonZeroU32,
}

impl BenchmarkKindProps for ConsumerGroupArgs {
fn message_size(&self) -> u32 {
self.message_size.get()
}

fn messages_per_batch(&self) -> u32 {
self.messages_per_batch.get()
}

fn message_batches(&self) -> u32 {
self.message_batches.get()
}

fn number_of_streams(&self) -> u32 {
self.streams.get()
}

fn number_of_partitions(&self) -> u32 {
panic!("No partitions in consumer group benchmark");
}

fn consumers(&self) -> u32 {
self.consumers.get()
}

fn producers(&self) -> u32 {
panic!("No producers in consumer group");
}

fn disable_parallel_producer_streams(&self) -> bool {
panic!("No parallel producer for consumer group");
}

fn disable_parallel_consumer_streams(&self) -> bool {
panic!("No parallel consumer for consumer group");
}

fn transport_command(&self) -> &BenchmarkTransportCommand {
&self.transport
}

fn number_of_consumer_groups(&self) -> u32 {
self.consumer_groups.get()
}

fn validate(&self) {
let cg_number = self.consumer_groups.get();
let consumers_number = self.consumers.get();
let mut cmd = IggyBenchArgs::command();
if cg_number < 1 {
cmd.error(
ErrorKind::ArgumentConflict,
"Consumer groups number must be greater than 0 for a consumer groups benchmark.",
)
.exit();
}

if consumers_number < 1 {
cmd.error(
ErrorKind::ArgumentConflict,
"Consumers number must be greater than 0 for a consumer groups benchmark.",
)
.exit();
}
}
}

/// Polling (reading) benchmark
#[derive(Parser, Debug)]
pub struct PollArgs {
Expand All @@ -193,6 +315,10 @@ pub struct PollArgs {
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)]
pub streams: NonZeroU32,

/// Number of streams
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)]
pub partitions: NonZeroU32,

/// Flag, disables parallel consumers
#[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_CONSUMER_STREAMS)]
pub disable_parallel_consumers: bool,
Expand All @@ -215,6 +341,10 @@ impl BenchmarkKindProps for PollArgs {
self.streams.get()
}

fn number_of_partitions(&self) -> u32 {
self.partitions.get()
}

fn consumers(&self) -> u32 {
self.consumers.get()
}
Expand All @@ -235,15 +365,19 @@ impl BenchmarkKindProps for PollArgs {
&self.transport
}

fn number_of_consumer_groups(&self) -> u32 {
panic!("No consumer groups in poll benchmark");
}

fn validate(&self) {
let streams = self.streams.get();
let consumers = self.consumers.get();
let mut cmd = IggyBenchArgs::command();

if self.disable_parallel_consumers && streams < consumers {
if !self.disable_parallel_consumers && streams < consumers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("With parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."),
format!("Without parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."),
)
.exit();
}
Expand Down Expand Up @@ -280,6 +414,10 @@ pub struct SendAndPollArgs {
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)]
pub streams: NonZeroU32,

/// Number of partitions
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)]
pub partitions: NonZeroU32,

/// Flag, disables parallel producers
#[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS)]
pub disable_parallel_producers: bool,
Expand All @@ -298,6 +436,10 @@ impl BenchmarkKindProps for SendAndPollArgs {
self.streams.get()
}

fn number_of_partitions(&self) -> u32 {
self.partitions.get()
}

fn message_batches(&self) -> u32 {
self.message_batches.get()
}
Expand Down Expand Up @@ -326,24 +468,28 @@ impl BenchmarkKindProps for SendAndPollArgs {
&self.transport
}

fn number_of_consumer_groups(&self) -> u32 {
panic!("No consumer groups in send and poll benchmark");
}

fn validate(&self) {
let streams = self.streams.get();
let consumers = self.consumers.get();
let producers = self.producers.get();
let mut cmd = IggyBenchArgs::command();

if self.disable_parallel_consumers && streams < consumers {
if !self.disable_parallel_consumers && streams < consumers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("With parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."),
format!("Without parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."),
)
.exit();
}

if self.disable_parallel_producers && streams < producers {
if !self.disable_parallel_producers && streams < producers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("With parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).",
format!("Without parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).",
))
.exit();
}
Expand Down
2 changes: 2 additions & 0 deletions bench/src/args/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub trait BenchmarkKindProps {
fn messages_per_batch(&self) -> u32;
fn message_batches(&self) -> u32;
fn number_of_streams(&self) -> u32;
fn number_of_partitions(&self) -> u32;
fn number_of_consumer_groups(&self) -> u32;
fn consumers(&self) -> u32;
fn producers(&self) -> u32;
fn disable_parallel_producer_streams(&self) -> bool;
Expand Down
2 changes: 2 additions & 0 deletions bench/src/args/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ pub enum BenchmarkKind {
Poll,
#[display(fmt = "send and poll messages")]
SendAndPoll,
#[display(fmt = "consumer group poll")]
ConsumerGroupPoll,
}
Loading

0 comments on commit 52ac78c

Please sign in to comment.