From df80a52c405009be07d13e7ea139194364eaa85a Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 27 Oct 2021 18:39:44 -0600 Subject: [PATCH] config: drop ProduceRequestTimeout to 10s, doc more 30s is too generous. Even 10s is likely too generous, since usually writes at p99 finish within 100ms. --- pkg/kgo/config.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index fd0ad639..76a57f96 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -280,7 +280,7 @@ func (cfg *cfg) validate() error { // Some random producer settings. {name: "max buffered records", v: int64(cfg.maxBufferedRecords), allowed: 1, badcmp: i64lt}, {name: "linger", v: int64(cfg.linger), allowed: int64(time.Minute), badcmp: i64gt, durs: true}, - {name: "produce timeout", v: int64(cfg.produceTimeout), allowed: int64(time.Second), badcmp: i64lt, durs: true}, + {name: "produce timeout", v: int64(cfg.produceTimeout), allowed: int64(100 * time.Millisecond), badcmp: i64lt, durs: true}, {name: "record timeout", v: int64(cfg.recordTimeout), allowed: int64(time.Second), badcmp: func(l, r int64) (bool, string) { if l == 0 { return false, "" // we print nothing when things are good @@ -446,7 +446,7 @@ func defaultCfg() cfg { compression: []CompressionCodec{SnappyCompression(), NoCompression()}, maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012 maxBufferedRecords: 10000, - produceTimeout: 30 * time.Second, + produceTimeout: 10 * time.Second, recordRetries: math.MaxInt64, // effectively unbounded partitioner: StickyKeyPartitioner(nil), // default to how Kafka partitions @@ -874,11 +874,13 @@ func RecordPartitioner(partitioner Partitioner) ProducerOpt { } // ProduceRequestTimeout sets how long Kafka broker's are allowed to respond to -// produce requests, overriding the default 30s. If a broker exceeds this +// produce requests, overriding the default 10s. If a broker exceeds this // duration, it will reply with a request timeout error. // -// This corresponds to Kafka's request.timeout.ms setting, but only applies to -// produce requests. +// This somewhat corresponds to Kafka's request.timeout.ms setting, but only +// applies to produce requests. This settings sets the TimeoutMillis field in +// the produce request itself. The ConnTimeoutOverhead is applied as a write +// limit and read limit in addition to this. func ProduceRequestTimeout(limit time.Duration) ProducerOpt { return producerOpt{func(cfg *cfg) { cfg.produceTimeout = limit }} }