Skip to content

Commit

Permalink
producing: evaluate whether a batch should fail before and after
Browse files Browse the repository at this point in the history
We were inconsistent on evaluating whether a batch should fail. Before
writing a request, we would check if a record ctx expired. After, we
would evaluate the record timeout and the produce retries. We may as
well evaluate all three at once both before and after for consistency.

We also now allow checking whether records should be failed before
writing a request in the non-idempotent context as well.
  • Loading branch information
twmb committed Apr 25, 2021
1 parent 8925da3 commit e324b56
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 22 deletions.
5 changes: 2 additions & 3 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,8 @@ func ManualFlushing() ProducerOpt {
// this option with lingering. In that case, simply add the linger to the
// record timeout to avoid problems.
//
// The timeout is only evaluated after a produce response, and only for batches
// that need to be retried. Thus, a sink backoff may delay record timeout
// slightly. As with lingering, this also should generally be a non-issue.
// The timeout is only evaluated evaluated before writing a request or after a
// produce response. Thus, a sink backoff may delay record timeout slightly.
func RecordTimeout(timeout time.Duration) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.recordTimeout = timeout }}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ var (
// configured record timeout limit.
errRecordTimeout = errors.New("records have timed out before they were able to be produced")

errRecordRetries = errors.New("record failed after being retried too many times")

errClientClosing = errors.New("client closing")

//////////////
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func noPromise(*Record, error) {}
// records (to avoid invalid sequence numbers), all buffered records for a
// partition are aborted. The context checked for doneness is always the first
// buffered record's context. If that record is successfully produced, the
// context will then be the next first buffered record. The context is only
// evaluated before writing a produce request.
// context will then be the next first buffered record. The context is
// evaluated before or after writing a request.
//
// The first buffered record for an unknown topic begins a timeout for the
// configured record timeout limit; all records buffered within the wait will
Expand Down
43 changes: 26 additions & 17 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kgo
import (
"bytes"
"context"
"errors"
"fmt"
"hash/crc32"
"strings"
Expand Down Expand Up @@ -1113,8 +1112,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
batch0 := recBuf.batches[0]
batch0.tries++
if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) &&
(batch0.isTimedOut(recBuf.cl.cfg.recordTimeout) || batch0.tries > recBuf.cl.cfg.produceRetries || !kerr.IsRetriable(err)) {
failErr := batch0.maybeFailErr(&recBuf.cl.cfg)
if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (!kerr.IsRetriable(err) || failErr != nil) {
recBuf.failAllRecords(err)
}
}
Expand Down Expand Up @@ -1206,6 +1205,24 @@ type recBatch struct {
records []promisedNumberedRecord
}

// Returns an error if the batch should fail.
func (b *recBatch) maybeFailErr(cfg *cfg) error {
if len(b.records) > 0 {
ctx := b.records[0].ctx
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
if b.isTimedOut(cfg.recordTimeout) {
return errRecordTimeout
} else if b.tries >= cfg.produceRetries {
return errRecordRetries
}
return nil
}

func (b *recBatch) v0wireLength() int32 { return b.v1wireLength - 8 } // no timestamp
func (b *recBatch) batchLength() int32 { return b.wireLength - 4 } // no length prefix
func (b *recBatch) flexibleWireLength() int32 { // uvarint length prefix
Expand All @@ -1216,7 +1233,8 @@ func (b *recBatch) flexibleWireLength() int32 { // uvarint length prefix
// appendRecord saves a new record to a batch.
//
// This is called under the owning recBuf's mu, meaning records cannot be
// concurrently modified by failing.
// concurrently modified by failing. This batch cannot actively be used
// in a request, so we do not need to worry about a concurrent read.
func (b *recBatch) appendRecord(pr promisedRec, nums recordNumbers) {
b.wireLength += nums.wireLength()
b.v1wireLength += messageSet1Length(pr.Record)
Expand Down Expand Up @@ -1329,13 +1347,10 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch
}

if recBuf.batches[0] == batch {
if batch.canFailFromLoadErrs {
ctx := batch.records[0].ctx
select {
case <-ctx.Done():
recBuf.failAllRecords(ctx.Err())
if !r.idempotent || batch.canFailFromLoadErrs {
if err := batch.maybeFailErr(&batch.owner.cl.cfg); err != nil {
recBuf.failAllRecords(err)
return false
default:
}
}
if recBuf.needSeqReset {
Expand Down Expand Up @@ -1399,13 +1414,7 @@ func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn f
batch.owner.mu.Lock()
if batch.isOwnersFirstBatch() {
if canFail || cfg.disableIdempotency {
var err error
if batch.isTimedOut(cfg.recordTimeout) {
err = errRecordTimeout
} else if batch.tries >= cfg.produceRetries {
err = errors.New("record failed after being retried too many times")
}
if err != nil {
if err := batch.maybeFailErr(cfg); err != nil {
batch.owner.failAllRecords(err)
batch.owner.mu.Unlock()
continue
Expand Down

0 comments on commit e324b56

Please sign in to comment.