diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 4dfc0acd..02efe9cf 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -26,6 +26,7 @@ type producer struct { aborting uint32 // 1 means yes drains int32 // number of sinks draining + issues int32 // number of in flight produce requests idMu sync.Mutex idVersion int16 @@ -55,12 +56,14 @@ func (p *producer) init() { p.notifyCond = sync.NewCond(&p.notifyMu) } -func (p *producer) incDrains() { - atomic.AddInt32(&p.drains, 1) -} +func (p *producer) incDrains() { atomic.AddInt32(&p.drains, 1) } +func (p *producer) incIssues() { atomic.AddInt32(&p.issues, 1) } + +func (p *producer) decDrains() { p.decAbortNotify(&p.drains) } +func (p *producer) decIssues() { p.decAbortNotify(&p.issues) } -func (p *producer) decDrains() { - if atomic.AddInt32(&p.drains, -1) != 0 || atomic.LoadUint32(&p.aborting) == 0 { +func (p *producer) decAbortNotify(v *int32) { + if atomic.AddInt32(v, -1) != 0 || atomic.LoadUint32(&p.aborting) == 0 { return } p.notifyMu.Lock() diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 5ed29a50..8e95304a 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -38,6 +38,13 @@ type sink struct { needBackoff bool backoffSeq uint32 // prevents pile on failures + // To work around KAFKA-12671, before we issue EndTxn, we check to see + // that all sinks had a final successful response. If not, then we risk + // running into KAFKA-12671 (out of order processing leading to + // orphaned begun "transaction" in ProducerStateManager), so rather + // than issuing EndTxn immediately, we wait a little bit. + lastRespSuccessful bool + // consecutiveFailures is incremented every backoff and cleared every // successful response. For simplicity, if we have a good response // following an error response before the error response's backoff @@ -58,9 +65,10 @@ type seqResp struct { func (cl *Client) newSink(nodeID int32) *sink { s := &sink{ - cl: cl, - nodeID: nodeID, - produceVersion: -1, + cl: cl, + nodeID: nodeID, + produceVersion: -1, + lastRespSuccessful: true, } s.inflightSem.Store(make(chan struct{}, 1)) return s @@ -236,7 +244,12 @@ func (s *sink) drain() { s.maybeBackoff() sem := s.inflightSem.Load().(chan struct{}) - sem <- struct{}{} + select { + case sem <- struct{}{}: + case <-s.cl.ctx.Done(): + s.drainState.hardFinish() + return + } again = s.drainState.maybeFinish(s.produce(sem)) } @@ -297,9 +310,18 @@ func (s *sink) produce(sem <-chan struct{}) bool { req.backoffSeq = s.backoffSeq // safe to read outside mu since we are in drain loop + // Add that we are issuing and then check if we are aborting: this + // order ensures that we will do not produce after aborting is set. + s.cl.producer.incDrains() + if s.cl.producer.isAborting() { + s.cl.producer.decDrains() + return false + } + produced = true s.doSequenced(req, func(resp kmsg.Response, err error) { s.handleReqResp(req, resp, err) + s.cl.producer.decDrains() <-sem }) return moreToDrain @@ -376,8 +398,10 @@ func (s *sink) issueTxnReq( ) error { resp, err := txnReq.RequestWith(s.cl.ctx, s.cl) if err != nil { + s.lastRespSuccessful = false return err } + s.lastRespSuccessful = true for _, topic := range resp.Topics { topicBatches, ok := req.batches[topic.Topic] diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index ff01a7f6..fb2e79a9 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -160,6 +160,27 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry return false, err // same } defer s.cl.ResetProducerID() + + allOk := true + s.cl.sinksAndSourcesMu.Lock() + for _, sns := range s.cl.sinksAndSources { + allOk = allOk && sns.sink.lastRespSuccessful + } + s.cl.sinksAndSourcesMu.Unlock() + + if !allOk { + s.cl.cfg.logger.Log(LogLevelWarn, "Buffered records were aborted, but some sink(s) did not have a final handled produce response. Kafka could still be handling these produce requests or have yet to handle them. We do not want to issue EndTxn before these produce requests are handled, because that would risk beginning a new transaction that we may not finish. Waiting 1s to give Kafka some time... (See KAFKA-12671)") + timer := time.NewTimer(time.Second) + select { + case <-timer.C: + case <-s.cl.ctx.Done(): + timer.Stop() + return false, s.cl.ctx.Err() + case <-ctx.Done(): + timer.Stop() + return false, ctx.Err() + } + } } wantCommit := bool(commit) @@ -293,6 +314,8 @@ func (cl *Client) BeginTransaction() error { func (cl *Client) AbortBufferedRecords(ctx context.Context) error { atomic.StoreUint32(&cl.producer.aborting, 1) defer atomic.StoreUint32(&cl.producer.aborting, 0) + atomic.AddInt32(&cl.producer.flushing, 1) // disallow lingering to start + defer atomic.AddInt32(&cl.producer.flushing, -1) // At this point, all drain loops that start will immediately stop, // thus they will not begin any AddPartitionsToTxn request. We must // now wait for any req currently built to be done being issued. @@ -316,7 +339,7 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error { defer cl.producer.notifyMu.Unlock() defer close(done) - for !quit && atomic.LoadInt32(&cl.producer.drains) > 0 { + for !quit && (atomic.LoadInt32(&cl.producer.drains) > 0 || atomic.LoadInt32(&cl.producer.issues) > 0) { cl.producer.notifyCond.Wait() } }()