Skip to content

Commit

Permalink
producer: all records to fail when idempotency is enabled
Browse files Browse the repository at this point in the history
This removes the always-retry restriction when producing with
idempotency. If we successfully receive a response, we then can safely
fail records under certain circumstances.
  • Loading branch information
twmb committed Apr 25, 2021
1 parent 0798e31 commit 314de8e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 29 deletions.
31 changes: 17 additions & 14 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,8 @@ func (cfg *cfg) validate() error {
if cfg.disableIdempotency && cfg.txnID != nil {
return errors.New("cannot both disable idempotent writes and use transactional IDs")
}
if !cfg.disableIdempotency {
if cfg.acks.val != -1 {
return errors.New("idempotency requires acks=all")
}
if cfg.produceRetries != math.MaxInt64 {
return errors.New("idempotency requires ProduceRetries to be unlimited")
}
if cfg.recordTimeout != 0 {
return errors.New("idempotency requires RecordTimeout to be unlimited")
}
if !cfg.disableIdempotency && cfg.acks.val != -1 {
return errors.New("idempotency requires acks=all")
}

for _, limit := range []struct {
Expand Down Expand Up @@ -687,8 +679,15 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt {
}

// ProduceRetries sets the number of tries for producing records, overriding
// the unlimited default. This option can only be set if DisableIdempotency is
// also set.
// the unlimited default.
//
// If idempotency is enabled (as it is by default), this option is only
// enforced if it is safe to do so without messing up sequence numbers. It is
// safe to enforce if a record was never issued in a request to Kafka, or if it
// was requested and received a response.
//
// This option is different from RequestRetries to allow finer grained control
// of when to fail when producing records.
func ProduceRetries(n int) Opt {
return clientOpt{func(cfg *cfg) { cfg.produceRetries = int64(n) }}
}
Expand Down Expand Up @@ -743,8 +742,12 @@ func ManualFlushing() ProducerOpt {
}

// RecordTimeout sets a rough time of how long a record can sit around in a
// batch before timing out, overriding the ulimited default. This option can
// only be set if DisableIdempotency is also set.
// batch before timing out, overriding the ulimited default.
//
// If idempotency is enabled (as it is by default), this option is only
// enforced if it is safe to do so without messing up sequence numbers. It is
// safe to enforce if a record was never issued in a request to Kafka, or if it
// was requested and received a response.
//
// The timeout for all records in a batch inherit the timeout of the first
// record in that batch. That is, once the first record's timeout expires, all
Expand Down
63 changes: 48 additions & 15 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *sink) createReq(producerID int64, producerEpoch int16) (*produceRequest

producerID: producerID,
producerEpoch: producerEpoch,
idempotent: !s.cl.cfg.disableIdempotency,
idempotent: s.cl.idempotent(),

compressor: s.cl.compressor,

Expand Down Expand Up @@ -461,7 +461,7 @@ func (s *sink) issueTxnReq(
// Resets the drain indices for any first-batch.
func (s *sink) requeueUnattemptedReq(req *produceRequest, err error) {
var maybeDrain bool
req.batches.tryResetFailingBatchesWith(&s.cl.cfg, func(seqRecBatch) {
req.batches.tryResetFailingBatchesWith(&s.cl.cfg, false, func(seqRecBatch) {
maybeDrain = true
})
if maybeDrain {
Expand Down Expand Up @@ -500,7 +500,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {
case err == errChosenBrokerDead:
// A dead broker means the broker may have migrated, so we
// retry to force a metadata reload.
s.handleRetryBatches(req.batches)
s.handleRetryBatches(req.batches, req.backoffSeq, false, false)

case err == errClientClosing:
s.cl.failBufferedRecords(errClientClosing)
Expand Down Expand Up @@ -619,10 +619,10 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)

if len(req.batches) > 0 {
s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", s.nodeID)
s.handleRetryBatches(req.batches)
s.handleRetryBatches(req.batches, 0, true, false)
}
if len(reqRetry) > 0 {
s.handleRetryBatches(reqRetry)
s.handleRetryBatches(reqRetry, 0, true, true)
}
}

Expand Down Expand Up @@ -668,6 +668,10 @@ func (s *sink) handleReqRespBatch(
return false
}

// Since we have received a response and we are the first batch, we can
// at this point re-enable failing from load errors.
batch.canFailFromLoadErrs = true

err := kerr.ErrorForCode(errorCode)
switch {
case kerr.IsRetriable(err) &&
Expand Down Expand Up @@ -784,7 +788,7 @@ func (s *sink) handleReqRespBatch(
"partition", partition,
"err", err,
"err_is_retriable", kerr.IsRetriable(err),
"max_retries_reached", batch.tries == s.cl.cfg.produceRetries,
"max_retries_reached", batch.tries >= s.cl.cfg.produceRetries,
)
} else {
}
Expand Down Expand Up @@ -847,14 +851,31 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i

// handleRetryBatches sets any first-buf-batch to failing and triggers a
// metadata that will eventually clear the failing state and re-drain.
func (s *sink) handleRetryBatches(retry seqRecBatches) {
func (s *sink) handleRetryBatches(
retry seqRecBatches,
backoffSeq uint32,
updateMeta bool, // if we should maybe update the metadata
canFail bool, // if records can fail if they are at limits
) {
var needsMetaUpdate bool
retry.tryResetFailingBatchesWith(&s.cl.cfg, func(batch seqRecBatch) {
batch.owner.failing = true
needsMetaUpdate = true
retry.tryResetFailingBatchesWith(&s.cl.cfg, canFail, func(batch seqRecBatch) {
if updateMeta {
batch.owner.failing = true
needsMetaUpdate = true
}
})

// If we are retrying without a metadata update, then we definitely
// want to backoff a little bit: our chosen broker died, let's not
// spin-loop re-requesting.
//
// If we do want to metadata update, we only do so if any batch was the
// first batch in its buf / not concurrently failed.
if needsMetaUpdate {
s.cl.triggerUpdateMetadata(false)
s.cl.triggerUpdateMetadata(true)
} else if !updateMeta {
s.maybeTriggerBackoff(backoffSeq)
s.maybeDrain()
}
}

Expand Down Expand Up @@ -1088,12 +1109,13 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
if len(recBuf.batches) == 0 {
return
}
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
batch0 := recBuf.batches[0]
batch0.tries++
if !recBuf.cl.idempotent() && (batch0.tries > recBuf.cl.cfg.produceRetries || !kerr.IsRetriable(err)) {
if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) &&
(batch0.isTimedOut(recBuf.cl.cfg.recordTimeout) || batch0.tries > recBuf.cl.cfg.produceRetries || !kerr.IsRetriable(err)) {
recBuf.failAllRecords(err)
}
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
}

// failAllRecords fails all buffered records in this recBuf.
Expand Down Expand Up @@ -1164,6 +1186,14 @@ type recBatch struct {

tries int64 // if this was sent before and is thus now immutable

// We can only fail a batch if we have never issued it, or we have
// issued it and have received a response. If we do not receive a
// response, we cannot know whether we actually wrote bytes that Kafka
// processed or not. So, we set this to false every time we issue a
// request with this batch, and then reset it to true whenever we
// process a response.
canFailFromLoadErrs bool

wireLength int32 // tracks total size this batch would currently encode as, including length prefix
v1wireLength int32 // same as wireLength, but for message set v1

Expand Down Expand Up @@ -1217,6 +1247,8 @@ func (recBuf *recBuf) newRecordBatch() *recBatch {
owner: recBuf,
records: make([]promisedNumberedRecord, 0, 10),
wireLength: recordBatchOverhead,

canFailFromLoadErrs: true, // until we send this batch, we can fail it
}
}

Expand Down Expand Up @@ -1300,6 +1332,7 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch
}

batch.tries++
batch.canFailFromLoadErrs = false
r.wireLength += batchWireLength
r.batches.addBatch(
recBuf.topic,
Expand Down Expand Up @@ -1346,12 +1379,12 @@ func (rbs *seqRecBatches) addSeqBatch(topic string, part int32, batch seqRecBatc
//
// If idempotency is disabled, if a batch is timed out or hit the retry limit,
// we fail it and anything after it.
func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, fn func(seqRecBatch)) {
func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn func(seqRecBatch)) {
for _, partitions := range rbs {
for _, batch := range partitions {
batch.owner.mu.Lock()
if batch.isOwnersFirstBatch() {
if cfg.disableIdempotency {
if canFail || cfg.disableIdempotency {
var err error
if batch.isTimedOut(cfg.recordTimeout) {
err = errRecordTimeout
Expand Down

0 comments on commit 314de8e

Please sign in to comment.