diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 49bff3b1..a40c6b7e 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -714,6 +714,8 @@ func (s *sink) handleReqRespBatch( // Since we have received a response and we are the first batch, we can // at this point re-enable failing from load errors. + // + // We do not need a lock since the owner is locked. batch.canFailFromLoadErrs = true // By default, we assume we errored. Non-error updates this back @@ -1294,6 +1296,10 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { batch0 := recBuf.batches[0] batch0.tries++ + // We need to lock the batch as well because there could be a buffered + // request about to be written. Writing requests only grabs the batch + // mu, not the recBuf mu. + batch0.mu.Lock() var ( canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting @@ -1303,6 +1309,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { willFail = canFail && (batch0Fail || !netErr && (!retryableKerr || retryableKerr && isUnknownLimit)) ) + batch0.isFailingFromLoadErr = willFail + batch0.mu.Unlock() recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", logID(recBuf.sink.nodeID), @@ -1316,6 +1324,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { "is_unknown_limit", isUnknownLimit, "will_fail", willFail, ) + if willFail { recBuf.failAllRecords(err) } @@ -1406,6 +1415,10 @@ type recBatch struct { // request with this batch, and then reset it to true whenever we // process a response. canFailFromLoadErrs bool + // If we are going to fail the batch in bumpRepeatedLoadErr, we need to + // set this bool to true. There could be a concurrent request about to + // be written. See more comments below where this is used. + isFailingFromLoadErr bool wireLength int32 // tracks total size this batch would currently encode as, including length prefix v1wireLength int32 // same as wireLength, but for message set v1 @@ -1958,7 +1971,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte { for partition, batch := range partitions { dst = kbin.AppendInt32(dst, partition) batch.mu.Lock() - if batch.records == nil { // concurrent failAllRecords + if batch.records == nil || batch.isFailingFromLoadErr { // concurrent failAllRecords OR concurrent bumpRepeatedLoadErr if flexible { dst = kbin.AppendCompactNullableBytes(dst, nil) } else {