From 91fe77b6dce379808c198564e638d639fa048184 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 29 Jun 2021 14:25:23 -0600 Subject: [PATCH] bench: faster record value generation, add -static-record flag Rather than copying 20 bytes each time, we can basically do an exponentially faster copy. Additionally, this adds a new flag that allows for eliminating any benmark record-value-creation overhead. This is similar to the default behavior of librdkafka, which uses a static message for all messages produced. --- examples/bench/README.md | 19 +++++++++++++++++ examples/bench/main.go | 45 ++++++++++++++++++++++++++-------------- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/examples/bench/README.md b/examples/bench/README.md index c26141ad..6365cea1 100644 --- a/examples/bench/README.md +++ b/examples/bench/README.md @@ -27,6 +27,20 @@ a group. All flags that _can_ be easily supported in comparisons are the same. +## `rdkafka_performance` + +This client has a few different default flags in comparison to librdkafka's +`rdkafka_performance` utility. For starters, this prints less stats, but stats +can be added if requested. + +To operate similarly to librdkafka's benchmarker, use `-linger 1s -compression none -static-record`. + +This will match `rdkafka_performance` flags of `./rdkafka_performance -P -t -b -s 100 -i 1000`. + +librdkafka itself does not handle disabling lingering too well, but there is +negligible performance impact in franz-go, so feel free to leave lingering +disabled. + ## Flags `-brokers` can be specified to override the default localhost:9092 broker to @@ -51,6 +65,11 @@ any comma delimited set of brokers. `-pool` enables using a `sync.Pool` to reuse records and value slices, reducing garbage as a factor of the benchmark. +`-static-record` configures the benchmarking to use a single, static value for +all messages produced. This implies `-pool`, and completely eliminates any +record-value-creation overhead from benchmarking so that you can specifically +bench the performance of the client itself. + `-disable-idempotency` disables producing idempotently, which limits the throughput to 1rps `-linger` sets an amount of milliseconds to linger before producing, overriding the default 0. diff --git a/examples/bench/main.go b/examples/bench/main.go index dbd4af91..0332989e 100644 --- a/examples/bench/main.go +++ b/examples/bench/main.go @@ -25,6 +25,8 @@ var ( topic = flag.String("topic", "", "topic to produce to or consume from") pprofPort = flag.String("pprof", ":9876", "port to bind to for pprof, if non-empty") + useStaticValue = flag.Bool("static-record", false, "if true, use the same record value for every record (eliminates creating and formatting values for records; implies -pool)") + recordBytes = flag.Int("record-bytes", 100, "bytes per record (producing)") noCompression = flag.Bool("no-compression", false, "set to disable compression (alias for -compression none, producing)") compression = flag.String("compression", "snappy", "compression algorithm to use (none,gzip,snappy,lz4,zstd, for producing)") @@ -74,6 +76,11 @@ func main() { die("record bytes must be larger than zero") } + if *useStaticValue { + staticValue = make([]byte, *recordBytes) + formatValue(0, staticValue) + } + opts := []kgo.Opt{ kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...), kgo.DefaultProduceTopic(*topic), @@ -184,7 +191,9 @@ func main() { var num int64 for { cl.Produce(context.Background(), newRecord(num), func(r *kgo.Record, err error) { - if *poolProduce { + if *useStaticValue { + staticPool.Put(r) + } else if *poolProduce { p.Put(r) } chk(err, "produce error: %v", err) @@ -211,28 +220,32 @@ func main() { } } -var p = sync.Pool{ - New: func() interface{} { - s := make([]byte, *recordBytes) - return &kgo.Record{Value: s} - }, -} +var ( + staticValue []byte + staticPool = sync.Pool{New: func() interface{} { return kgo.SliceRecord(staticValue) }} + p = sync.Pool{New: func() interface{} { return kgo.SliceRecord(make([]byte, *recordBytes)) }} +) func newRecord(num int64) *kgo.Record { - var buf [20]byte // max int64 takes 19 bytes, then we add a space - b := strconv.AppendInt(buf[:0], num, 10) - b = append(b, ' ') - var r *kgo.Record - if *poolProduce { + if *useStaticValue { + return staticPool.Get().(*kgo.Record) + } else if *poolProduce { r = p.Get().(*kgo.Record) } else { r = kgo.SliceRecord(make([]byte, *recordBytes)) } + formatValue(num, r.Value) + return r +} + +func formatValue(num int64, v []byte) { + var buf [20]byte // max int64 takes 19 bytes, then we add a space + b := strconv.AppendInt(buf[:0], num, 10) + b = append(b, ' ') - var n int - for n != len(r.Value) { - n += copy(r.Value[n:], b) + n := copy(v, b) + for n != len(v) { + n += copy(v[n:], b) } - return r }