Skip to content

Commit

Permalink
producer: guts overhaul
Browse files Browse the repository at this point in the history
This is a redux of the guts of a producer, with a focus on much more
rigor around sequence numbers, particularly around when if ever is it
safe to reset them.

The version before this did not reset sequence numbers properly: if
sequence numbers were reset in the idempotent producer, we did not bump
the epoch, so we would get OOOSN. Previously, AbortBufferedRecords did
not work at all.

The new code avoids failing batches and their corresponding partitions
more often now, preferring instead to retry batches. Further, to be
absolutely sure we get sequence numbers correct, we now require
unlimited retries and no record timeouts for idempotent (and by proxy,
transactional) production.

---

This refactors most of the accounting aspects of buffering records to be
more easily understood and to break up large functions that did too
much. I believe this makes some areas clearer now.

---

This adds support for the upcoming flexible versions aspect of producing
in Kafka 2.8.0+.

---

This refactors the leader / leaderEpoch aspects of metadata updates to
be simpler and more easily understandable, which removes the need for
two large doc comments on recBuf and on cursor. The leader and
leaderEpoch are not currently used on recBuf, but we are now prepared
for when they will be.

---

This adds aborting buffered records when ending a transaction, which
previously did not work because AbortBufferedRecords broke sequence
numbers. Now, we reset the producer ID, which should be safe and has
been integration tested.
  • Loading branch information
twmb committed Apr 13, 2021
1 parent cc64edc commit ebc8ee2
Show file tree
Hide file tree
Showing 13 changed files with 913 additions and 831 deletions.
8 changes: 1 addition & 7 deletions pkg/kgo/atomic_maybe_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,7 @@ func (b *atomicBool) set(v bool) {
}
}

func (b *atomicBool) get() bool {
v := atomic.LoadUint32((*uint32)(b))
if v == 1 {
return true
}
return false
}
func (b *atomicBool) get() bool { return atomic.LoadUint32((*uint32)(b)) == 1 }

const (
stateUnstarted = iota
Expand Down
14 changes: 13 additions & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
return 0, ctx.Err()
case <-cxn.cl.ctx.Done():
after.Stop()
return 0, ctx.Err()
return 0, errClientClosing
case <-cxn.deadCh:
after.Stop()
return 0, ErrConnDead
Expand Down Expand Up @@ -777,9 +777,15 @@ func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Du
case <-cxn.cl.ctx.Done():
cxn.conn.SetWriteDeadline(time.Now())
<-writeDone
if writeErr != nil {
writeErr = errClientClosing
}
case <-ctx.Done():
cxn.conn.SetWriteDeadline(time.Now())
<-writeDone
if writeErr != nil && ctx.Err() != nil {
writeErr = ctx.Err()
}
}
return
}
Expand Down Expand Up @@ -824,9 +830,15 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
case <-cxn.cl.ctx.Done():
cxn.conn.SetReadDeadline(time.Now())
<-readDone
if err != nil {
err = errClientClosing
}
case <-ctx.Done():
cxn.conn.SetReadDeadline(time.Now())
<-readDone
if err != nil && ctx.Err() != nil {
err = ctx.Err()
}
}
return
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Client struct {
metadone chan struct{}
}

func (cl *Client) idempotent() bool { return !cl.cfg.disableIdempotency }

type sinkAndSource struct {
sink *sink
source *source
Expand Down Expand Up @@ -427,12 +429,7 @@ func (cl *Client) Close() {
sns.source.maybeConsume() // same
}

// We must manually fail all partitions that never had a sink.
for _, partitions := range cl.loadTopics() {
for _, partition := range partitions.load().partitions {
partition.records.failAllRecords(ErrBrokerDead)
}
}
cl.failBufferedRecords(errClientClosing)
}

// Request issues a request to Kafka, waiting for and returning the response.
Expand Down Expand Up @@ -506,7 +503,7 @@ func (cl *Client) retriableBrokerFn(fn func() (*broker, error)) *retriable {
}

func (cl *Client) shouldRetry(tries int, err error) bool {
return err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && tries < cl.cfg.retries
return err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries
}

type retriable struct {
Expand Down
51 changes: 31 additions & 20 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type cfg struct {
minVersions *kversion.Versions

retryBackoff func(int) time.Duration
retries int
retries int64
retryTimeout func(int16) time.Duration
brokerConnDeadRetries int

Expand All @@ -85,6 +85,7 @@ type cfg struct {
maxRecordBatchBytes int32
maxBufferedRecords int64
produceTimeout time.Duration
produceRetries int64
linger time.Duration
recordTimeout time.Duration
manualFlushing bool
Expand Down Expand Up @@ -125,8 +126,11 @@ func (cfg *cfg) validate() error {
if cfg.acks.val != -1 {
return errors.New("idempotency requires acks=all")
}
if cfg.retries == 0 {
return errors.New("idempotency requires RequestRetries to be greater than 0")
if cfg.produceRetries != math.MaxInt64 {
return errors.New("idempotency requires ProduceRetries to be unlimited")
}
if cfg.recordTimeout != 0 {
return errors.New("idempotency requires RecordTimeout to be unlimited")
}
}

Expand Down Expand Up @@ -284,7 +288,7 @@ func defaultCfg() cfg {
return backoff
}
}(),
retries: math.MaxInt32, // effectively unbounded
retries: math.MaxInt64, // effectively unbounded
retryTimeout: func(key int16) time.Duration {
if key == 26 { // EndTxn key
return 5 * time.Minute
Expand All @@ -305,6 +309,7 @@ func defaultCfg() cfg {
maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012
maxBufferedRecords: math.MaxInt64,
produceTimeout: 30 * time.Second,
produceRetries: math.MaxInt64, // effectively unbounded
partitioner: StickyKeyPartitioner(nil), // default to how Kafka partitions

maxWait: 5000,
Expand Down Expand Up @@ -443,11 +448,10 @@ func RetryBackoff(backoff func(int) time.Duration) Opt {
}

// RequestRetries sets the number of tries that retriable requests are allowed,
// overriding the unlimited default.
//
// This setting applies to all types of requests.
// overriding the unlimited default. This option does not apply to produce
// requests.
func RequestRetries(n int) Opt {
return clientOpt{func(cfg *cfg) { cfg.retries = n }}
return clientOpt{func(cfg *cfg) { cfg.retries = int64(n) }}
}

// RetryTimeout sets the upper limit on how long we allow requests to retry,
Expand Down Expand Up @@ -661,6 +665,13 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.produceTimeout = limit }}
}

// ProduceRetries sets the number of tries for producing records, overriding
// the unlimited default. This option can only be set if DisableIdempotency is
// also set.
func ProduceRetries(n int) Opt {
return clientOpt{func(cfg *cfg) { cfg.produceRetries = int64(n) }}
}

// StopOnDataLoss sets the client to stop producing if data loss is detected,
// overriding the default false.
//
Expand Down Expand Up @@ -710,19 +721,19 @@ func ManualFlushing() ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.manualFlushing = true }}
}

// RecordTimeout sets a rough time of how long a record can sit
// around in a batch before timing out.
// RecordTimeout sets a rough time of how long a record can sit around in a
// batch before timing out, overriding the ulimited default. This option can
// only be set if DisableIdempotency is also set.
//
// Note that the timeout for all records in a batch inherit the timeout of the
// first record in that batch. That is, once the first record's timeout
// expires, all records in the batch are expired. This generally is a non-issue
// unless using this option with lingering. In that case, simply add the linger
// to the record timeout to avoid problems.
// The timeout for all records in a batch inherit the timeout of the first
// record in that batch. That is, once the first record's timeout expires, all
// records in the batch are expired. This generally is a non-issue unless using
// this option with lingering. In that case, simply add the linger to the
// record timeout to avoid problems.
//
// Also note that the timeout is only evaluated after a produce response, and
// only for batches that need to be retried. Thus, a sink backoff may delay
// record timeout slightly. As with lingering, this also should generally be a
// non-issue.
// The timeout is only evaluated after a produce response, and only for batches
// that need to be retried. Thus, a sink backoff may delay record timeout
// slightly. As with lingering, this also should generally be a non-issue.
func RecordTimeout(timeout time.Duration) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.recordTimeout = timeout }}
}
Expand Down Expand Up @@ -780,7 +791,7 @@ func FetchMaxWait(wait time.Duration) ConsumerOpt {

// FetchMaxBytes sets the maximum amount of bytes a broker will try to
// send during a fetch, overriding the default 50MiB. Note that brokers may not
// obey this limit if it has messages larger than this limit. Also note that
// obey this limit if it has records larger than this limit. Also note that
// this client sends a fetch to each broker concurrently, meaning the client
// will buffer up to <brokers * max bytes> worth of memory.
//
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ var (
// ErrCommitWithFatalID is returned when trying to commit in
// EndTransaction with a producer ID that has failed.
ErrCommitWithFatalID = errors.New("cannot commit with a fatal producer id; retry with an abort")

errClientClosing = errors.New("client closing")
)

// ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the
Expand Down
62 changes: 26 additions & 36 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicParti
continue
}

// This 249 limit is in Kafka itself, we copy it here to rely on it while producing.
if len(topicMeta.Topic) > 249 {
parts.loadErr = fmt.Errorf("invalid long topic name of (len %d) greater than max allowed 249", len(topicMeta.Topic))
continue
}

// Kafka partitions are strictly increasing from 0. We enforce
// that here; if any partition is missing, we consider this
// topic a load failure.
Expand All @@ -272,9 +278,11 @@ func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicParti
}

p := &topicPartition{
loadErr: kerr.ErrorForCode(partMeta.ErrorCode),
leader: partMeta.Leader,
leaderEpoch: leaderEpoch,
loadErr: kerr.ErrorForCode(partMeta.ErrorCode),
topicPartitionData: topicPartitionData{
leader: partMeta.Leader,
leaderEpoch: leaderEpoch,
},

records: &recBuf{
cl: cl,
Expand All @@ -293,15 +301,13 @@ func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicParti
keepControl: cl.cfg.keepControl,
cursorsIdx: -1,

leader: partMeta.Leader,
leaderEpoch: leaderEpoch,

cursorOffset: cursorOffset{
offset: -1, // required to not consume until needed
lastConsumedEpoch: -1, // required sentinel
},
},
}

// Any partition that has a load error uses the first
// seed broker as a leader. This ensures that every
// record buffer and every cursor can use a sink or
Expand All @@ -310,6 +316,9 @@ func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicParti
p.leader = unknownSeedID(0)
}

p.cursor.topicPartitionData = p.topicPartitionData
p.records.topicPartitionData = p.topicPartitionData

cl.sinksAndSourcesMu.Lock()
sns, exists := cl.sinksAndSources[p.leader]
if !exists {
Expand Down Expand Up @@ -356,17 +365,10 @@ func (cl *Client) mergeTopicPartitions(
// produced, we bump the respective error or fail everything. There is
// nothing to be done in a consumer.
if r.loadErr != nil {
retriable := kerr.IsRetriable(r.loadErr)
if retriable {
for _, topicPartition := range lv.partitions {
topicPartition.records.bumpRepeatedLoadErr(lv.loadErr)
}
} else {
for _, topicPartition := range lv.partitions {
topicPartition.records.failAllRecords(lv.loadErr)
}
for _, topicPartition := range lv.partitions {
topicPartition.records.bumpRepeatedLoadErr(lv.loadErr)
}
return retriable
return true
}

// Before the atomic update, we keep the latest partitions / writable
Expand Down Expand Up @@ -433,30 +435,18 @@ func (cl *Client) mergeTopicPartitions(
continue
}

// If the new sink is the same as the old, we simply copy over
// the records pointer and maybe begin producing again.
// If the tp data is the same, we simply copy over the records
// and cursor pointers.
//
// We always clear the failing state; migration does this itself.
if newTP.records.sink == oldTP.records.sink {
// If the tp data equals the old, then the sink / source is the
// same, because the sink/source is from the tp leader.
if newTP.topicPartitionData == oldTP.topicPartitionData {
newTP.records = oldTP.records
newTP.records.clearFailing()
} else {
oldTP.migrateProductionTo(newTP)
}

// The cursor source could be different because we could be
// fetching from a preferred replica.
if newTP.cursor.leader == oldTP.cursor.leader &&
newTP.cursor.leaderEpoch == oldTP.cursor.leaderEpoch {

newTP.cursor = oldTP.cursor

// Unlike above, there is no failing state for a
// cursor. If a cursor has a fetch error, we buffer
// that information for a poll, and then we continue to
// re-fetch that error.
newTP.records.clearFailing() // always clear failing state for producing after meta update
newTP.cursor = oldTP.cursor // unlike records, there is no failing state for a cursor

} else {
oldTP.migrateProductionTo(newTP) // migration clears failing state
oldTP.migrateCursorTo(
newTP,
&cl.consumer,
Expand Down
26 changes: 15 additions & 11 deletions pkg/kgo/produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,29 @@ func TestRecBatchAppendTo(t *testing.T) {
},
}

// Define field-fixing functions.

version := int16(2)
version := int16(99)
ourBatch.wireLength = 4 + int32(len(kbatch.AppendTo(nil))) // length prefix; required for flexible versioning

// After compression, we fix the length & crc on kbatch.
fixFields := func() {
rawBatch := kbatch.AppendTo(nil)
kbatch.Length = int32(len(rawBatch[8+4:])) // skip first offset (int64) and length
kbatch.CRC = int32(crc32.Checksum(rawBatch[8+4+4+1+4:], crc32c)) // skip thru crc

rawBatch = ourBatch.appendTo(nil, version, 12, 11, true, nil)
ourBatch.wireLength = int32(len(rawBatch)) // fix length PRE compression
}

var compressor *compressor
var checkNum int
check := func() {
exp := kbatch.AppendTo(nil)
gotFull := ourBatch.appendTo(nil, version, 12, 11, true, compressor)
gotFull := ourBatch.appendTo(nil, version, 12, 11, true, true, compressor)
lengthPrefix := 4
ourBatchSize := (&kbin.Reader{Src: gotFull}).Int32()
got := gotFull[4:]
if version >= 9 {
r := &kbin.Reader{Src: gotFull}
ourBatchSize = int32(r.Uvarint()) - 1
lengthPrefix = len(gotFull) - len(r.Src)
}
got := gotFull[lengthPrefix:]
if ourBatchSize != int32(len(got)) {
t.Errorf("check %d: incorrect record prefixing written length %d != actual %d", checkNum, ourBatchSize, len(got))
}
Expand All @@ -171,7 +174,7 @@ func TestRecBatchAppendTo(t *testing.T) {
compressor, _ = newCompressor(CompressionCodec{codec: 2}) // snappy
{
kbatch.Attributes |= 0x0002 // snappy
kbatch.Records, _ = compressor.compress(sliceWriters.Get().(*sliceWriter), kbatch.Records, 99)
kbatch.Records, _ = compressor.compress(sliceWriters.Get().(*sliceWriter), kbatch.Records, version)
}

fixFields()
Expand All @@ -180,7 +183,7 @@ func TestRecBatchAppendTo(t *testing.T) {
// ***As a produce request***
txid := "tx"
kmsgReq := kmsg.ProduceRequest{
Version: 8, // TODO bump to 99
Version: version,
TransactionID: &txid,
Acks: -1,
TimeoutMillis: 1000,
Expand All @@ -193,12 +196,13 @@ func TestRecBatchAppendTo(t *testing.T) {
}},
}
ourReq := produceRequest{
version: 8,
version: version,
txnID: &txid,
acks: -1,
timeout: 1000,
producerID: 12,
producerEpoch: 11,
idempotent: true,
compressor: compressor,
}
ourReq.batches.addSeqBatch("topic", 1, ourBatch)
Expand Down
Loading

0 comments on commit ebc8ee2

Please sign in to comment.