Skip to content

Commit

Permalink
bench: faster record value generation, add -static-record flag
Browse files Browse the repository at this point in the history
Rather than copying 20 bytes each time, we can basically do an
exponentially faster copy.

Additionally, this adds a new flag that allows for eliminating any
benmark record-value-creation overhead. This is similar to the default
behavior of librdkafka, which uses a static message for all messages
produced.
  • Loading branch information
twmb committed Jun 29, 2021
1 parent f95859e commit 91fe77b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
19 changes: 19 additions & 0 deletions examples/bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ a group.

All flags that _can_ be easily supported in comparisons are the same.

## `rdkafka_performance`

This client has a few different default flags in comparison to librdkafka's
`rdkafka_performance` utility. For starters, this prints less stats, but stats
can be added if requested.

To operate similarly to librdkafka's benchmarker, use `-linger 1s -compression none -static-record`.

This will match `rdkafka_performance` flags of `./rdkafka_performance -P -t <topic> -b <brokers> -s 100 -i 1000`.

librdkafka itself does not handle disabling lingering too well, but there is
negligible performance impact in franz-go, so feel free to leave lingering
disabled.

## Flags

`-brokers` can be specified to override the default localhost:9092 broker to
Expand All @@ -51,6 +65,11 @@ any comma delimited set of brokers.
`-pool` enables using a `sync.Pool` to reuse records and value slices, reducing
garbage as a factor of the benchmark.

`-static-record` configures the benchmarking to use a single, static value for
all messages produced. This implies `-pool`, and completely eliminates any
record-value-creation overhead from benchmarking so that you can specifically
bench the performance of the client itself.

`-disable-idempotency` disables producing idempotently, which limits the throughput to 1rps

`-linger` sets an amount of milliseconds to linger before producing, overriding the default 0.
Expand Down
45 changes: 29 additions & 16 deletions examples/bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (
topic = flag.String("topic", "", "topic to produce to or consume from")
pprofPort = flag.String("pprof", ":9876", "port to bind to for pprof, if non-empty")

useStaticValue = flag.Bool("static-record", false, "if true, use the same record value for every record (eliminates creating and formatting values for records; implies -pool)")

recordBytes = flag.Int("record-bytes", 100, "bytes per record (producing)")
noCompression = flag.Bool("no-compression", false, "set to disable compression (alias for -compression none, producing)")
compression = flag.String("compression", "snappy", "compression algorithm to use (none,gzip,snappy,lz4,zstd, for producing)")
Expand Down Expand Up @@ -74,6 +76,11 @@ func main() {
die("record bytes must be larger than zero")
}

if *useStaticValue {
staticValue = make([]byte, *recordBytes)
formatValue(0, staticValue)
}

opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
kgo.DefaultProduceTopic(*topic),
Expand Down Expand Up @@ -184,7 +191,9 @@ func main() {
var num int64
for {
cl.Produce(context.Background(), newRecord(num), func(r *kgo.Record, err error) {
if *poolProduce {
if *useStaticValue {
staticPool.Put(r)
} else if *poolProduce {
p.Put(r)
}
chk(err, "produce error: %v", err)
Expand All @@ -211,28 +220,32 @@ func main() {
}
}

var p = sync.Pool{
New: func() interface{} {
s := make([]byte, *recordBytes)
return &kgo.Record{Value: s}
},
}
var (
staticValue []byte
staticPool = sync.Pool{New: func() interface{} { return kgo.SliceRecord(staticValue) }}
p = sync.Pool{New: func() interface{} { return kgo.SliceRecord(make([]byte, *recordBytes)) }}
)

func newRecord(num int64) *kgo.Record {
var buf [20]byte // max int64 takes 19 bytes, then we add a space
b := strconv.AppendInt(buf[:0], num, 10)
b = append(b, ' ')

var r *kgo.Record
if *poolProduce {
if *useStaticValue {
return staticPool.Get().(*kgo.Record)
} else if *poolProduce {
r = p.Get().(*kgo.Record)
} else {
r = kgo.SliceRecord(make([]byte, *recordBytes))
}
formatValue(num, r.Value)
return r
}

func formatValue(num int64, v []byte) {
var buf [20]byte // max int64 takes 19 bytes, then we add a space
b := strconv.AppendInt(buf[:0], num, 10)
b = append(b, ' ')

var n int
for n != len(r.Value) {
n += copy(r.Value[n:], b)
n := copy(v, b)
for n != len(v) {
n += copy(v[n:], b)
}
return r
}

0 comments on commit 91fe77b

Please sign in to comment.