From 1827add65c72959bcdc2036a000d679a7d7e9177 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 20 Jul 2024 17:59:19 -0600 Subject: [PATCH] kgo sink: fix read/write race for recBatch.canFailFromLoadErrs When writing a record batch during a request, the batch mutex is locked. This guards against a concurrent failAllRecords, which can be triggered from a metadata update. However, a boolean field that guarded against failing buffered records if it's not "safe" was not properly mutex guarded. Writing a request only locks the batch, not the owning recBuf, while checking to see if the batch could fail only locked the owning recBuf, not the batch. This adds locking around the batch when checking if it can be failed, and adds a bool that, if true (due to load failures), ensures the batch is not written. Closes #785. --- pkg/kgo/sink.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 49bff3b1..a40c6b7e 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -714,6 +714,8 @@ func (s *sink) handleReqRespBatch( // Since we have received a response and we are the first batch, we can // at this point re-enable failing from load errors. + // + // We do not need a lock since the owner is locked. batch.canFailFromLoadErrs = true // By default, we assume we errored. Non-error updates this back @@ -1294,6 +1296,10 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { batch0 := recBuf.batches[0] batch0.tries++ + // We need to lock the batch as well because there could be a buffered + // request about to be written. Writing requests only grabs the batch + // mu, not the recBuf mu. + batch0.mu.Lock() var ( canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting @@ -1303,6 +1309,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { willFail = canFail && (batch0Fail || !netErr && (!retryableKerr || retryableKerr && isUnknownLimit)) ) + batch0.isFailingFromLoadErr = willFail + batch0.mu.Unlock() recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", logID(recBuf.sink.nodeID), @@ -1316,6 +1324,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { "is_unknown_limit", isUnknownLimit, "will_fail", willFail, ) + if willFail { recBuf.failAllRecords(err) } @@ -1406,6 +1415,10 @@ type recBatch struct { // request with this batch, and then reset it to true whenever we // process a response. canFailFromLoadErrs bool + // If we are going to fail the batch in bumpRepeatedLoadErr, we need to + // set this bool to true. There could be a concurrent request about to + // be written. See more comments below where this is used. + isFailingFromLoadErr bool wireLength int32 // tracks total size this batch would currently encode as, including length prefix v1wireLength int32 // same as wireLength, but for message set v1 @@ -1958,7 +1971,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte { for partition, batch := range partitions { dst = kbin.AppendInt32(dst, partition) batch.mu.Lock() - if batch.records == nil { // concurrent failAllRecords + if batch.records == nil || batch.isFailingFromLoadErr { // concurrent failAllRecords OR concurrent bumpRepeatedLoadErr if flexible { dst = kbin.AppendCompactNullableBytes(dst, nil) } else {