diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 3f83fffc..b02e246f 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -755,9 +755,8 @@ func ManualFlushing() ProducerOpt { // this option with lingering. In that case, simply add the linger to the // record timeout to avoid problems. // -// The timeout is only evaluated after a produce response, and only for batches -// that need to be retried. Thus, a sink backoff may delay record timeout -// slightly. As with lingering, this also should generally be a non-issue. +// The timeout is only evaluated evaluated before writing a request or after a +// produce response. Thus, a sink backoff may delay record timeout slightly. func RecordTimeout(timeout time.Duration) ProducerOpt { return producerOpt{func(cfg *cfg) { cfg.recordTimeout = timeout }} } diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 37c7639b..13dae7c2 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -73,6 +73,8 @@ var ( // configured record timeout limit. errRecordTimeout = errors.New("records have timed out before they were able to be produced") + errRecordRetries = errors.New("record failed after being retried too many times") + errClientClosing = errors.New("client closing") ////////////// diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index f2c02247..8aa5ee51 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -102,8 +102,8 @@ func noPromise(*Record, error) {} // records (to avoid invalid sequence numbers), all buffered records for a // partition are aborted. The context checked for doneness is always the first // buffered record's context. If that record is successfully produced, the -// context will then be the next first buffered record. The context is only -// evaluated before writing a produce request. +// context will then be the next first buffered record. The context is +// evaluated before or after writing a request. // // The first buffered record for an unknown topic begins a timeout for the // configured record timeout limit; all records buffered within the wait will diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 24099096..14d6ee66 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -3,7 +3,6 @@ package kgo import ( "bytes" "context" - "errors" "fmt" "hash/crc32" "strings" @@ -1113,8 +1112,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) batch0 := recBuf.batches[0] batch0.tries++ - if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && - (batch0.isTimedOut(recBuf.cl.cfg.recordTimeout) || batch0.tries > recBuf.cl.cfg.produceRetries || !kerr.IsRetriable(err)) { + failErr := batch0.maybeFailErr(&recBuf.cl.cfg) + if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (!kerr.IsRetriable(err) || failErr != nil) { recBuf.failAllRecords(err) } } @@ -1206,6 +1205,24 @@ type recBatch struct { records []promisedNumberedRecord } +// Returns an error if the batch should fail. +func (b *recBatch) maybeFailErr(cfg *cfg) error { + if len(b.records) > 0 { + ctx := b.records[0].ctx + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + if b.isTimedOut(cfg.recordTimeout) { + return errRecordTimeout + } else if b.tries >= cfg.produceRetries { + return errRecordRetries + } + return nil +} + func (b *recBatch) v0wireLength() int32 { return b.v1wireLength - 8 } // no timestamp func (b *recBatch) batchLength() int32 { return b.wireLength - 4 } // no length prefix func (b *recBatch) flexibleWireLength() int32 { // uvarint length prefix @@ -1216,7 +1233,8 @@ func (b *recBatch) flexibleWireLength() int32 { // uvarint length prefix // appendRecord saves a new record to a batch. // // This is called under the owning recBuf's mu, meaning records cannot be -// concurrently modified by failing. +// concurrently modified by failing. This batch cannot actively be used +// in a request, so we do not need to worry about a concurrent read. func (b *recBatch) appendRecord(pr promisedRec, nums recordNumbers) { b.wireLength += nums.wireLength() b.v1wireLength += messageSet1Length(pr.Record) @@ -1329,13 +1347,10 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch } if recBuf.batches[0] == batch { - if batch.canFailFromLoadErrs { - ctx := batch.records[0].ctx - select { - case <-ctx.Done(): - recBuf.failAllRecords(ctx.Err()) + if !r.idempotent || batch.canFailFromLoadErrs { + if err := batch.maybeFailErr(&batch.owner.cl.cfg); err != nil { + recBuf.failAllRecords(err) return false - default: } } if recBuf.needSeqReset { @@ -1399,13 +1414,7 @@ func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn f batch.owner.mu.Lock() if batch.isOwnersFirstBatch() { if canFail || cfg.disableIdempotency { - var err error - if batch.isTimedOut(cfg.recordTimeout) { - err = errRecordTimeout - } else if batch.tries >= cfg.produceRetries { - err = errors.New("record failed after being retried too many times") - } - if err != nil { + if err := batch.maybeFailErr(cfg); err != nil { batch.owner.failAllRecords(err) batch.owner.mu.Unlock() continue