Skip to content

Commit

Permalink
producer: avoid deadlock when when quickly recreating a topic
Browse files Browse the repository at this point in the history
Closes #403.
  • Loading branch information
twmb committed Mar 21, 2023
1 parent 1f9d3aa commit 769e02f
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,34 +682,47 @@ func (cl *Client) resetAllProducerSequences() {
func (cl *Client) failProducerID(id int64, epoch int16, err error) {
p := &cl.producer

p.idMu.Lock()
defer p.idMu.Unlock()

current := p.id.Load().(*producerID)
if current.id != id || current.epoch != epoch {
cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id request due to current id being different",
"current_id", current.id,
"current_epoch", current.epoch,
"current_err", current.err,
"fail_id", id,
"fail_epoch", epoch,
"fail_err", err,
)
return // failed an old id
}

// If this is not UnknownProducerID, then we cannot recover production.
// We do not lock the idMu when failing a producer ID, for two reasons.
//
// If this is UnknownProducerID without a txnID, then we are here from
// stopOnDataLoss in sink.go (see large comment there).
// 1) With how we store below, we do not need to. We only fail if the
// ID we are failing has not changed and if the ID we are failing has
// not failed already. Failing outside the lock is the same as failing
// within the lock.
//
// If this is UnknownProducerID with a txnID, then EndTransaction will
// recover us.
p.id.Store(&producerID{
// 2) Locking would cause a deadlock, because producerID locks
// idMu=>recBuf.Mu, whereas we failing while locked within a recBuf in
// sink.go.
new := &producerID{
id: id,
epoch: epoch,
err: err,
})
}
for {
current := p.id.Load().(*producerID)
if current.id != id || current.epoch != epoch {
cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id request due to current id being different",
"current_id", current.id,
"current_epoch", current.epoch,
"current_err", current.err,
"fail_id", id,
"fail_epoch", epoch,
"fail_err", err,
)
return
}
if current.err != nil {
cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id because our producer id has already been failed",
"current_id", current.id,
"current_epoch", current.epoch,
"current_err", current.err,
"fail_err", err,
)
return
}
if p.id.CompareAndSwap(current, new) {
return
}
}
}

// doInitProducerID inits the idempotent ID and potentially the transactional
Expand Down Expand Up @@ -795,7 +808,7 @@ func (cl *Client) partitionsForTopicProduce(pr promisedRec) (*topicPartitions, *

p.topics.storeTopics([]string{topic})
cl.addUnknownTopicRecord(pr)
cl.triggerUpdateMetadataNow("forced load because we are producing to a new topic for the first time")
cl.triggerUpdateMetadataNow("forced load because we are producing to a topic for the first time")
return nil, nil
}
}
Expand Down

0 comments on commit 769e02f

Please sign in to comment.