From 769e02f2b2b5ab967a355ead9513ca71e6538bed Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 20 Mar 2023 20:06:50 -0600 Subject: [PATCH] producer: avoid deadlock when when quickly recreating a topic Closes #403. --- pkg/kgo/producer.go | 61 +++++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 7119b9c4..3805d192 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -682,34 +682,47 @@ func (cl *Client) resetAllProducerSequences() { func (cl *Client) failProducerID(id int64, epoch int16, err error) { p := &cl.producer - p.idMu.Lock() - defer p.idMu.Unlock() - - current := p.id.Load().(*producerID) - if current.id != id || current.epoch != epoch { - cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id request due to current id being different", - "current_id", current.id, - "current_epoch", current.epoch, - "current_err", current.err, - "fail_id", id, - "fail_epoch", epoch, - "fail_err", err, - ) - return // failed an old id - } - - // If this is not UnknownProducerID, then we cannot recover production. + // We do not lock the idMu when failing a producer ID, for two reasons. // - // If this is UnknownProducerID without a txnID, then we are here from - // stopOnDataLoss in sink.go (see large comment there). + // 1) With how we store below, we do not need to. We only fail if the + // ID we are failing has not changed and if the ID we are failing has + // not failed already. Failing outside the lock is the same as failing + // within the lock. // - // If this is UnknownProducerID with a txnID, then EndTransaction will - // recover us. - p.id.Store(&producerID{ + // 2) Locking would cause a deadlock, because producerID locks + // idMu=>recBuf.Mu, whereas we failing while locked within a recBuf in + // sink.go. + new := &producerID{ id: id, epoch: epoch, err: err, - }) + } + for { + current := p.id.Load().(*producerID) + if current.id != id || current.epoch != epoch { + cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id request due to current id being different", + "current_id", current.id, + "current_epoch", current.epoch, + "current_err", current.err, + "fail_id", id, + "fail_epoch", epoch, + "fail_err", err, + ) + return + } + if current.err != nil { + cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id because our producer id has already been failed", + "current_id", current.id, + "current_epoch", current.epoch, + "current_err", current.err, + "fail_err", err, + ) + return + } + if p.id.CompareAndSwap(current, new) { + return + } + } } // doInitProducerID inits the idempotent ID and potentially the transactional @@ -795,7 +808,7 @@ func (cl *Client) partitionsForTopicProduce(pr promisedRec) (*topicPartitions, * p.topics.storeTopics([]string{topic}) cl.addUnknownTopicRecord(pr) - cl.triggerUpdateMetadataNow("forced load because we are producing to a new topic for the first time") + cl.triggerUpdateMetadataNow("forced load because we are producing to a topic for the first time") return nil, nil } }