diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 6edab744..f4597cd5 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -123,16 +123,8 @@ func (cfg *cfg) validate() error { if cfg.disableIdempotency && cfg.txnID != nil { return errors.New("cannot both disable idempotent writes and use transactional IDs") } - if !cfg.disableIdempotency { - if cfg.acks.val != -1 { - return errors.New("idempotency requires acks=all") - } - if cfg.produceRetries != math.MaxInt64 { - return errors.New("idempotency requires ProduceRetries to be unlimited") - } - if cfg.recordTimeout != 0 { - return errors.New("idempotency requires RecordTimeout to be unlimited") - } + if !cfg.disableIdempotency && cfg.acks.val != -1 { + return errors.New("idempotency requires acks=all") } for _, limit := range []struct { @@ -687,8 +679,15 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt { } // ProduceRetries sets the number of tries for producing records, overriding -// the unlimited default. This option can only be set if DisableIdempotency is -// also set. +// the unlimited default. +// +// If idempotency is enabled (as it is by default), this option is only +// enforced if it is safe to do so without messing up sequence numbers. It is +// safe to enforce if a record was never issued in a request to Kafka, or if it +// was requested and received a response. +// +// This option is different from RequestRetries to allow finer grained control +// of when to fail when producing records. func ProduceRetries(n int) Opt { return clientOpt{func(cfg *cfg) { cfg.produceRetries = int64(n) }} } @@ -743,8 +742,12 @@ func ManualFlushing() ProducerOpt { } // RecordTimeout sets a rough time of how long a record can sit around in a -// batch before timing out, overriding the ulimited default. This option can -// only be set if DisableIdempotency is also set. +// batch before timing out, overriding the ulimited default. +// +// If idempotency is enabled (as it is by default), this option is only +// enforced if it is safe to do so without messing up sequence numbers. It is +// safe to enforce if a record was never issued in a request to Kafka, or if it +// was requested and received a response. // // The timeout for all records in a batch inherit the timeout of the first // record in that batch. That is, once the first record's timeout expires, all diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 902ff224..e43e993c 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -85,7 +85,7 @@ func (s *sink) createReq(producerID int64, producerEpoch int16) (*produceRequest producerID: producerID, producerEpoch: producerEpoch, - idempotent: !s.cl.cfg.disableIdempotency, + idempotent: s.cl.idempotent(), compressor: s.cl.compressor, @@ -461,7 +461,7 @@ func (s *sink) issueTxnReq( // Resets the drain indices for any first-batch. func (s *sink) requeueUnattemptedReq(req *produceRequest, err error) { var maybeDrain bool - req.batches.tryResetFailingBatchesWith(&s.cl.cfg, func(seqRecBatch) { + req.batches.tryResetFailingBatchesWith(&s.cl.cfg, false, func(seqRecBatch) { maybeDrain = true }) if maybeDrain { @@ -500,7 +500,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) { case err == errChosenBrokerDead: // A dead broker means the broker may have migrated, so we // retry to force a metadata reload. - s.handleRetryBatches(req.batches) + s.handleRetryBatches(req.batches, req.backoffSeq, false, false) case err == errClientClosing: s.cl.failBufferedRecords(errClientClosing) @@ -619,10 +619,10 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error) if len(req.batches) > 0 { s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", s.nodeID) - s.handleRetryBatches(req.batches) + s.handleRetryBatches(req.batches, 0, true, false) } if len(reqRetry) > 0 { - s.handleRetryBatches(reqRetry) + s.handleRetryBatches(reqRetry, 0, true, true) } } @@ -668,6 +668,10 @@ func (s *sink) handleReqRespBatch( return false } + // Since we have received a response and we are the first batch, we can + // at this point re-enable failing from load errors. + batch.canFailFromLoadErrs = true + err := kerr.ErrorForCode(errorCode) switch { case kerr.IsRetriable(err) && @@ -784,7 +788,7 @@ func (s *sink) handleReqRespBatch( "partition", partition, "err", err, "err_is_retriable", kerr.IsRetriable(err), - "max_retries_reached", batch.tries == s.cl.cfg.produceRetries, + "max_retries_reached", batch.tries >= s.cl.cfg.produceRetries, ) } else { } @@ -847,14 +851,31 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i // handleRetryBatches sets any first-buf-batch to failing and triggers a // metadata that will eventually clear the failing state and re-drain. -func (s *sink) handleRetryBatches(retry seqRecBatches) { +func (s *sink) handleRetryBatches( + retry seqRecBatches, + backoffSeq uint32, + updateMeta bool, // if we should maybe update the metadata + canFail bool, // if records can fail if they are at limits +) { var needsMetaUpdate bool - retry.tryResetFailingBatchesWith(&s.cl.cfg, func(batch seqRecBatch) { - batch.owner.failing = true - needsMetaUpdate = true + retry.tryResetFailingBatchesWith(&s.cl.cfg, canFail, func(batch seqRecBatch) { + if updateMeta { + batch.owner.failing = true + needsMetaUpdate = true + } }) + + // If we are retrying without a metadata update, then we definitely + // want to backoff a little bit: our chosen broker died, let's not + // spin-loop re-requesting. + // + // If we do want to metadata update, we only do so if any batch was the + // first batch in its buf / not concurrently failed. if needsMetaUpdate { - s.cl.triggerUpdateMetadata(false) + s.cl.triggerUpdateMetadata(true) + } else if !updateMeta { + s.maybeTriggerBackoff(backoffSeq) + s.maybeDrain() } } @@ -1088,12 +1109,13 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { if len(recBuf.batches) == 0 { return } + recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) batch0 := recBuf.batches[0] batch0.tries++ - if !recBuf.cl.idempotent() && (batch0.tries > recBuf.cl.cfg.produceRetries || !kerr.IsRetriable(err)) { + if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && + (batch0.isTimedOut(recBuf.cl.cfg.recordTimeout) || batch0.tries > recBuf.cl.cfg.produceRetries || !kerr.IsRetriable(err)) { recBuf.failAllRecords(err) } - recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) } // failAllRecords fails all buffered records in this recBuf. @@ -1164,6 +1186,14 @@ type recBatch struct { tries int64 // if this was sent before and is thus now immutable + // We can only fail a batch if we have never issued it, or we have + // issued it and have received a response. If we do not receive a + // response, we cannot know whether we actually wrote bytes that Kafka + // processed or not. So, we set this to false every time we issue a + // request with this batch, and then reset it to true whenever we + // process a response. + canFailFromLoadErrs bool + wireLength int32 // tracks total size this batch would currently encode as, including length prefix v1wireLength int32 // same as wireLength, but for message set v1 @@ -1217,6 +1247,8 @@ func (recBuf *recBuf) newRecordBatch() *recBatch { owner: recBuf, records: make([]promisedNumberedRecord, 0, 10), wireLength: recordBatchOverhead, + + canFailFromLoadErrs: true, // until we send this batch, we can fail it } } @@ -1300,6 +1332,7 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch } batch.tries++ + batch.canFailFromLoadErrs = false r.wireLength += batchWireLength r.batches.addBatch( recBuf.topic, @@ -1346,12 +1379,12 @@ func (rbs *seqRecBatches) addSeqBatch(topic string, part int32, batch seqRecBatc // // If idempotency is disabled, if a batch is timed out or hit the retry limit, // we fail it and anything after it. -func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, fn func(seqRecBatch)) { +func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn func(seqRecBatch)) { for _, partitions := range rbs { for _, batch := range partitions { batch.owner.mu.Lock() if batch.isOwnersFirstBatch() { - if cfg.disableIdempotency { + if canFail || cfg.disableIdempotency { var err error if batch.isTimedOut(cfg.recordTimeout) { err = errRecordTimeout