Skip to content

Commit

Permalink
sink: fix out-of-order response handling across partition rebalances
Browse files Browse the repository at this point in the history
If a recBuf moves from one sink to another while requests are inflight,
there is a pathological case that allows responses to finish out of
order. A lot of assumptions in the code exist such that all requests
will be processed in order: this is the guarantee of handleSeqResps.
However, handleSeqResps only works per sink. If a recBuf moves to a
different sink while requests are inflight, theoretically it could
create a request, issue it, receive a response, and handle that response
before the active inflight responses are received / handled on the
original sink.

In the worst case, this could lead to panics:

A: has two requests in flight
A: first request fails, reset the drain index to 0
metadata update moves partition to sink B
B: issues and finishes first request
B: issues second request. This is duplicate to A's second request.
A: second request finally fails, resets the drain index to 0
B: second request finishes successfully, decrements batch drain index

The next time the recBuf is used, the batchDrainIdx will be negative and
the client will panic.

It is difficult to imagine a scenario that has this same failure case
without multiple requests inflight.

Regardless, the code now does not start issuing requests on the new sink
until all requests on the old have finished. This moves closer to the
guarantee that all requests are handled in order: we remove the
cross-sink failure case.
  • Loading branch information
twmb committed Aug 10, 2021
1 parent c6d2824 commit f29fb7f
Showing 1 changed file with 93 additions and 24 deletions.
117 changes: 93 additions & 24 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti
recBufsIdx = (recBufsIdx + 1) % len(s.recBufs)

recBuf.mu.Lock()
if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx {
if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s {
recBuf.mu.Unlock()
continue
}
Expand All @@ -113,6 +113,9 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti
continue
}

recBuf.inflightOnSink = s
recBuf.inflight++

recBuf.batchDrainIdx++
recBuf.seq += int32(len(batch.records))
moreToDrain = moreToDrain || recBuf.tryStopLingerForDraining()
Expand Down Expand Up @@ -394,7 +397,21 @@ more:
func (s *sink) doTxnReq(
req *produceRequest,
txnReq *kmsg.AddPartitionsToTxnRequest,
) error {
) (err error) {
// If we return an unretriable error, then we have to reset everything
// to not be in the transaction and begin draining at the start.
//
// These batches must be the first in their recBuf, because we would
// not be trying to add them to a partition if they were not.
defer func() {
if err != nil {
req.batches.eachOwnerLocked(func(batch seqRecBatch) {
batch.owner.addedToTxn = false
batch.owner.resetBatchDrainIdx()
batch.decInflight()
})
}
}()
return s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
return s.issueTxnReq(req, txnReq)
})
Expand Down Expand Up @@ -430,16 +447,12 @@ func (s *sink) issueTxnReq(
continue
}

// If we did not add this partition to the txn, then
// this must be the first batch in the recBuf, because
// this is the first time seeing it, which is why we
// are trying to add it to the txn.
//
// Thus, we simply set that this is **not** added, and
// we reset the drain index to re-try.
// We are stripping this retriable-err batch from the request,
// so we must reset that it has been added to the txn.
batch.owner.mu.Lock()
batch.owner.addedToTxn = false
batch.owner.resetBatchDrainIdx()
batch.decInflight()
batch.owner.mu.Unlock()

delete(topicBatches, partition.Partition)
Expand Down Expand Up @@ -569,6 +582,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
} else if debug {
fmt.Fprintf(b, "%d{skipped}, ", partition)
}
batch.decInflight()
batch.owner.mu.Unlock()
}
if debug {
Expand Down Expand Up @@ -659,6 +673,7 @@ func (s *sink) handleReqRespBatch(
) (retry, didProduce bool) {
batch.owner.mu.Lock()
defer batch.owner.mu.Unlock()
defer batch.decInflight()

nrec := len(batch.records)

Expand Down Expand Up @@ -970,6 +985,19 @@ type recBuf struct {
// This exists to aid in removing the buffer from the sink.
recBufsIdx int

// A concurrent metadata update can move a recBuf from one sink to
// another wile requests are inflight on the original sink. We do not
// want to allow new requests to start on the new sink until they all
// finish on the old, because with some pathological request order
// finishing, we would allow requests to finish out of order:
// handleSeqResps works per sink, not across sinks.
inflightOnSink *sink
// Inflight tracks the number of requests inflight using batches from
// this recBuf. Every time this hits zero, if the batchDrainIdx is not
// at the end, we clear inflightOnSink and trigger the *current* sink
// to drain.
inflight uint8

topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated)

// seq is used for the seq in each record batch. It is incremented when
Expand Down Expand Up @@ -1136,7 +1164,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
batch0 := recBuf.batches[0]
batch0.tries++
failErr := batch0.maybeFailErr(&recBuf.cl.cfg)
if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (!kerr.IsRetriable(err) || failErr != nil) {
if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (failErr != nil || !isRetriableBrokerErr(err) && !kerr.IsRetriable(err)) {
recBuf.failAllRecords(err)
}
}
Expand Down Expand Up @@ -1329,6 +1357,30 @@ func (b *recBatch) isTimedOut(limit time.Duration) bool {
return time.Since(b.records[0].Timestamp) > limit
}

// Decrements the inflight count for this batch.
//
// If the inflight count hits zero, this potentially re-triggers a drain on the
// *current* sink. A concurrent metadata update could have moved the recBuf to
// a different sink; that sink will not drain this recBuf until all requests on
// the old sink are finished.
//
// This is always called in the produce request path, not anywhere else (i.e.
// not failAllRecords). We want inflight decrementing to be the last thing that
// happens always for every request It does not matter if the records were
// independently failed: from the request issuing perspective, the batch is
// still inflight.
func (b *recBatch) decInflight() {
recBuf := b.owner
recBuf.inflight--
if recBuf.inflight != 0 {
return
}
recBuf.inflightOnSink = nil
if recBuf.batchDrainIdx != len(recBuf.batches) {
recBuf.sink.maybeDrain()
}
}

////////////////////
// produceRequest //
////////////////////
Expand Down Expand Up @@ -1453,30 +1505,47 @@ func (rbs *seqRecBatches) addSeqBatch(topic string, part int32, batch seqRecBatc
topicBatches[part] = batch
}

// Resets the drain index for any batch that is the first in its record buffer.
// Resets the drain index for any batch that is the first in its record buffer,
// as well as decrements the inflight for all batches always.
//
// If idempotency is disabled, if a batch is timed out or hit the retry limit,
// we fail it and anything after it.
func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn func(seqRecBatch)) {
rbs.eachOwnerLocked(func(batch seqRecBatch) {
defer batch.decInflight()

if !batch.isOwnersFirstBatch() {
return
}

if canFail || cfg.disableIdempotency {
if err := batch.maybeFailErr(cfg); err != nil {
batch.owner.failAllRecords(err)
return
}
}

batch.owner.resetBatchDrainIdx()
fn(batch)
})
}

func (rbs seqRecBatches) each(fn func(seqRecBatch)) {
for _, partitions := range rbs {
for _, batch := range partitions {
batch.owner.mu.Lock()
if batch.isOwnersFirstBatch() {
if canFail || cfg.disableIdempotency {
if err := batch.maybeFailErr(cfg); err != nil {
batch.owner.failAllRecords(err)
batch.owner.mu.Unlock()
continue
}
}
batch.owner.resetBatchDrainIdx()
fn(batch)
}
batch.owner.mu.Unlock()
fn(batch)
}
}
}

func (rbs seqRecBatches) eachOwnerLocked(fn func(seqRecBatch)) {
rbs.each(func(batch seqRecBatch) {
batch.owner.mu.Lock()
defer batch.owner.mu.Unlock()
fn(batch)
})
}

//////////////
// COUNTING // - this section is all about counting how bytes lay out on the wire
//////////////
Expand Down

0 comments on commit f29fb7f

Please sign in to comment.