diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 65f217e1..43eeeb88 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -101,7 +101,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti recBufsIdx = (recBufsIdx + 1) % len(s.recBufs) recBuf.mu.Lock() - if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx { + if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s { recBuf.mu.Unlock() continue } @@ -113,6 +113,9 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti continue } + recBuf.inflightOnSink = s + recBuf.inflight++ + recBuf.batchDrainIdx++ recBuf.seq += int32(len(batch.records)) moreToDrain = moreToDrain || recBuf.tryStopLingerForDraining() @@ -394,7 +397,21 @@ more: func (s *sink) doTxnReq( req *produceRequest, txnReq *kmsg.AddPartitionsToTxnRequest, -) error { +) (err error) { + // If we return an unretriable error, then we have to reset everything + // to not be in the transaction and begin draining at the start. + // + // These batches must be the first in their recBuf, because we would + // not be trying to add them to a partition if they were not. + defer func() { + if err != nil { + req.batches.eachOwnerLocked(func(batch seqRecBatch) { + batch.owner.addedToTxn = false + batch.owner.resetBatchDrainIdx() + batch.decInflight() + }) + } + }() return s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error { return s.issueTxnReq(req, txnReq) }) @@ -430,16 +447,12 @@ func (s *sink) issueTxnReq( continue } - // If we did not add this partition to the txn, then - // this must be the first batch in the recBuf, because - // this is the first time seeing it, which is why we - // are trying to add it to the txn. - // - // Thus, we simply set that this is **not** added, and - // we reset the drain index to re-try. + // We are stripping this retriable-err batch from the request, + // so we must reset that it has been added to the txn. batch.owner.mu.Lock() batch.owner.addedToTxn = false batch.owner.resetBatchDrainIdx() + batch.decInflight() batch.owner.mu.Unlock() delete(topicBatches, partition.Partition) @@ -569,6 +582,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response } else if debug { fmt.Fprintf(b, "%d{skipped}, ", partition) } + batch.decInflight() batch.owner.mu.Unlock() } if debug { @@ -659,6 +673,7 @@ func (s *sink) handleReqRespBatch( ) (retry, didProduce bool) { batch.owner.mu.Lock() defer batch.owner.mu.Unlock() + defer batch.decInflight() nrec := len(batch.records) @@ -970,6 +985,19 @@ type recBuf struct { // This exists to aid in removing the buffer from the sink. recBufsIdx int + // A concurrent metadata update can move a recBuf from one sink to + // another wile requests are inflight on the original sink. We do not + // want to allow new requests to start on the new sink until they all + // finish on the old, because with some pathological request order + // finishing, we would allow requests to finish out of order: + // handleSeqResps works per sink, not across sinks. + inflightOnSink *sink + // Inflight tracks the number of requests inflight using batches from + // this recBuf. Every time this hits zero, if the batchDrainIdx is not + // at the end, we clear inflightOnSink and trigger the *current* sink + // to drain. + inflight uint8 + topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated) // seq is used for the seq in each record batch. It is incremented when @@ -1136,7 +1164,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { batch0 := recBuf.batches[0] batch0.tries++ failErr := batch0.maybeFailErr(&recBuf.cl.cfg) - if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (!kerr.IsRetriable(err) || failErr != nil) { + if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (failErr != nil || !isRetriableBrokerErr(err) && !kerr.IsRetriable(err)) { recBuf.failAllRecords(err) } } @@ -1329,6 +1357,30 @@ func (b *recBatch) isTimedOut(limit time.Duration) bool { return time.Since(b.records[0].Timestamp) > limit } +// Decrements the inflight count for this batch. +// +// If the inflight count hits zero, this potentially re-triggers a drain on the +// *current* sink. A concurrent metadata update could have moved the recBuf to +// a different sink; that sink will not drain this recBuf until all requests on +// the old sink are finished. +// +// This is always called in the produce request path, not anywhere else (i.e. +// not failAllRecords). We want inflight decrementing to be the last thing that +// happens always for every request It does not matter if the records were +// independently failed: from the request issuing perspective, the batch is +// still inflight. +func (b *recBatch) decInflight() { + recBuf := b.owner + recBuf.inflight-- + if recBuf.inflight != 0 { + return + } + recBuf.inflightOnSink = nil + if recBuf.batchDrainIdx != len(recBuf.batches) { + recBuf.sink.maybeDrain() + } +} + //////////////////// // produceRequest // //////////////////// @@ -1453,30 +1505,47 @@ func (rbs *seqRecBatches) addSeqBatch(topic string, part int32, batch seqRecBatc topicBatches[part] = batch } -// Resets the drain index for any batch that is the first in its record buffer. +// Resets the drain index for any batch that is the first in its record buffer, +// as well as decrements the inflight for all batches always. // // If idempotency is disabled, if a batch is timed out or hit the retry limit, // we fail it and anything after it. func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn func(seqRecBatch)) { + rbs.eachOwnerLocked(func(batch seqRecBatch) { + defer batch.decInflight() + + if !batch.isOwnersFirstBatch() { + return + } + + if canFail || cfg.disableIdempotency { + if err := batch.maybeFailErr(cfg); err != nil { + batch.owner.failAllRecords(err) + return + } + } + + batch.owner.resetBatchDrainIdx() + fn(batch) + }) +} + +func (rbs seqRecBatches) each(fn func(seqRecBatch)) { for _, partitions := range rbs { for _, batch := range partitions { - batch.owner.mu.Lock() - if batch.isOwnersFirstBatch() { - if canFail || cfg.disableIdempotency { - if err := batch.maybeFailErr(cfg); err != nil { - batch.owner.failAllRecords(err) - batch.owner.mu.Unlock() - continue - } - } - batch.owner.resetBatchDrainIdx() - fn(batch) - } - batch.owner.mu.Unlock() + fn(batch) } } } +func (rbs seqRecBatches) eachOwnerLocked(fn func(seqRecBatch)) { + rbs.each(func(batch seqRecBatch) { + batch.owner.mu.Lock() + defer batch.owner.mu.Unlock() + fn(batch) + }) +} + ////////////// // COUNTING // - this section is all about counting how bytes lay out on the wire //////////////