Skip to content

Commit

Permalink
producer: serialize promises
Browse files Browse the repository at this point in the history
Having promises potentially be concurrent was originally thought to be
an optimization, but overall after extensive benchmarking, it may not be
as much of an optimization as thought.

Previously, records that failed independent of producing could have
their promises called whenever. Produced records would always have their
promises called at the end of handling a produce response. Technically,
this means that if a broker has max produce requests in flight, starting
a new produce request would block on finishing promises for a previous
one if the promises were slow.

Now, we send all promises to a dedicated loop. This keeps our prior
guarantee of strict ordering and divorces promises from produce
requests.

I expect the common case for promises is to serialize the records
anyway, which implies locking within the promise. Hopefully, that
locking can now be removed. It is possible that promises that did no
work or did atomic work could be slower now, but extensive testing with
franz-go's bench utility (and changing the code to auto-success produce
requests) shows no difference -- if anything, serializing may be
slightly faster.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent e3ef142 commit 31f3f5f
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 109 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ var (
// Returned when trying to produce a record outside of a transaction.
errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction")

errNoTopic = errors.New("cannot produce record with no topic and no default topic")

// Returned for all buffered produce records when a user purges topics.
errPurged = errors.New("topic purged while buffered")

Expand Down
11 changes: 5 additions & 6 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -37,10 +36,12 @@ func TestGroupETL(t *testing.T) {
////////////////////

go func() {
cl, _ := NewClient(WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)))
cl, _ := NewClient(
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
MaxBufferedRecords(10000),
)
defer cl.Close()

var offsetsMu sync.Mutex
offsets := make(map[int32]int64)

defer func() {
Expand All @@ -66,16 +67,13 @@ func TestGroupETL(t *testing.T) {
}

// ensure the offsets for this partition are contiguous
offsetsMu.Lock()
current, ok := offsets[r.Partition]
if ok && r.Offset != current+1 {
errs <- fmt.Errorf("partition produced offsets out of order, got %d != exp %d", r.Offset, current+1)
} else if !ok && r.Offset != 0 {
errs <- fmt.Errorf("expected first produced record to partition to have offset 0, got %d", r.Offset)
}
offsets[r.Partition] = r.Offset

offsetsMu.Unlock()
},
)
}
Expand Down Expand Up @@ -122,6 +120,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
ConsumerGroup(c.group),
ConsumeTopics(c.consumeFrom),
Balancers(c.balancer),
MaxBufferedRecords(10000),

// Even with autocommitting, autocommitting does not commit
// *the latest* when being revoked. We always want to commit
Expand Down
163 changes: 87 additions & 76 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type producer struct {
notifyMu sync.Mutex
notifyCond *sync.Cond

batchPromises ringBatchPromise

txnMu sync.Mutex
inTxn bool
}
Expand All @@ -75,7 +77,7 @@ func (p *producer) init(cl *Client) {
p.cl = cl
p.topics = newTopicsPartitions()
p.unknownTopics = make(map[string]*unknownTopicProduces)
p.waitBuffer = make(chan struct{}, 32)
p.waitBuffer = make(chan struct{}, math.MaxInt64)
p.idVersion = -1
p.id.Store(&producerID{
id: -1,
Expand Down Expand Up @@ -114,7 +116,10 @@ func (p *producer) purgeTopics(topics []string) {
if unknown, exists := p.unknownTopics[topic]; exists {
delete(p.unknownTopics, topic)
close(unknown.wait)
p.failUnknownTopicRecords(unknown, errPurged)
p.promiseBatch(batchPromise{
recs: unknown.buffered,
err: errPurged,
})
}
}
p.unknownTopicsMu.Unlock()
Expand Down Expand Up @@ -203,12 +208,9 @@ func (rs ProduceResults) First() (*Record, error) {
func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults {
var (
wg sync.WaitGroup
mu sync.Mutex
results = make(ProduceResults, 0, len(rs))
promise = func(r *Record, err error) {
mu.Lock()
results = append(results, ProduceResult{r, err})
mu.Unlock()
wg.Done()
}
)
Expand Down Expand Up @@ -297,15 +299,14 @@ func (cl *Client) TryProduce(
// calling an optional `promise` with the record and a potential error when
// Kafka replies. For a synchronous produce, see ProduceSync. Records are
// produced in order per partition if the record is produced successfully.
// Records that fail to be produced (topic load failures, unbufferable records,
// etc.) may have their promised called at any time, and may be called
// concurrent with other promises. Successfully produced records will have
// their attributes, offset, and partition set before the promise is called.
// Successfully produced records will have their attributes, offset, and
// partition set before the promise is called. All promises are called
// serially (and should be relatively fast).
//
// If the topic field is empty, the client will use the DefaultProduceTopic; if
// that is also empty, the record will be failed immediately. If the record is
// too large to fit in a batch on its own in a produce request, the record will
// be failed with immediately kerr.MessageTooLarge.
// that is also empty, the record is failed immediately. If the record is too
// large to fit in a batch on its own in a produce request, the record will be
// failed with immediately kerr.MessageTooLarge.
//
// If the client is configured to automatically flush the client currently has
// the configured maximum amount of records buffered, Produce will block. The
Expand Down Expand Up @@ -342,24 +343,7 @@ func (cl *Client) produce(
promise = noPromise
}

if r.Topic == "" {
def := cl.cfg.defaultProduceTopic
if def == "" {
go promise(r, errors.New("cannot produce to a record that does not have a topic set"))
return
}
r.Topic = def
}

p := &cl.producer

if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 {
go promise(r, errNotInTransaction) // see comment just below for why we 'go' this
return
}

// Our record is now "buffered", and past this point will fall into
// finishRecordPromise, where we track it is finished.
if p.hooks != nil {
for _, h := range p.hooks.buffered {
h.OnProduceRecordBuffered(r)
Expand All @@ -371,18 +355,9 @@ func (cl *Client) produce(
// need to un-count our buffering of this record. We also need
// to drain a slot from the waitBuffer chan, which could be
// sent to right when we are erroring.
//
// We issue the waitBuffer drain in a goroutine because we do
// not want to block sending to it.
//
// We issue the promise finishing in a goroutine because we do
// not want to block Produce on executing the promise. The user
// could be consuming from a channel that is sent to in the
// promise only *after* Produce returns; not executing the
// promise in a goroutine would lead to a deadlock.
drainBuffered := func(err error) {
go func() { <-p.waitBuffer }()
go cl.finishRecordPromise(promisedRec{ctx, promise, r}, err)
p.promiseRecord(promisedRec{ctx, promise, r}, err)
<-p.waitBuffer
}
if !block || cl.cfg.manualFlushing {
drainBuffered(ErrMaxBuffered)
Expand All @@ -399,9 +374,66 @@ func (cl *Client) produce(
}
}

// Neither of the errors below should be hit in applications.
if r.Topic == "" {
def := cl.cfg.defaultProduceTopic
if def == "" {
p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic)
return
}
r.Topic = def
}
if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 {
p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction)
return
}

cl.partitionRecord(promisedRec{ctx, promise, r})
}

type batchPromise struct {
baseOffset int64
pid int64
epoch int16
attrs RecordAttrs
partition int32
recs []promisedRec
err error
}

func (p *producer) promiseBatch(b batchPromise) {
if first := p.batchPromises.push(b); first {
go p.finishPromises(b)
}
}

func (p *producer) promiseRecord(pr promisedRec, err error) {
p.promiseBatch(batchPromise{recs: []promisedRec{pr}, err: err})
}

func (p *producer) finishPromises(b batchPromise) {
cl := p.cl
var more bool
start:
for i, pr := range b.recs {
pr.Offset = b.baseOffset + int64(i)
pr.Partition = b.partition
pr.ProducerID = b.pid
pr.ProducerEpoch = b.epoch
pr.Attrs = b.attrs
cl.finishRecordPromise(pr, b.err)
b.recs[i] = promisedRec{}
}
if cap(b.recs) > 4 {
cl.prsPool.put(b.recs)
}

b, more = p.batchPromises.dropPeek()
if more {
goto start
}
}

func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
p := &cl.producer

Expand All @@ -418,7 +450,7 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) {

buffered := atomic.AddInt64(&p.bufferedRecords, -1)
if buffered >= cl.cfg.maxBufferedRecords {
go func() { p.waitBuffer <- struct{}{} }()
p.waitBuffer <- struct{}{}
} else if buffered == 0 && atomic.LoadInt32(&p.flushing) > 0 {
p.notifyMu.Lock()
p.notifyMu.Unlock() // nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
Expand All @@ -441,7 +473,7 @@ func (cl *Client) partitionRecord(pr promisedRec) {
// partitions can call this directly.
func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPartitionsData, pr promisedRec) {
if partsData.loadErr != nil && !kerr.IsRetriable(partsData.loadErr) {
cl.finishRecordPromise(pr, partsData.loadErr)
cl.producer.promiseRecord(pr, partsData.loadErr)
return
}

Expand All @@ -456,7 +488,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
mapping = partsData.partitions
}
if len(mapping) == 0 {
cl.finishRecordPromise(pr, errors.New("unable to partition record due to no usable partitions"))
cl.producer.promiseRecord(pr, errors.New("unable to partition record due to no usable partitions"))
return
}

Expand All @@ -472,7 +504,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
pick = parts.partitioner.Partition(pr.Record, len(mapping))
}
if pick < 0 || pick >= len(mapping) {
cl.finishRecordPromise(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping)))
cl.producer.promiseRecord(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping)))
return
}

Expand All @@ -492,7 +524,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
}

if pick < 0 || pick >= len(mapping) {
cl.finishRecordPromise(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping)))
cl.producer.promiseRecord(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping)))
return
}
partition = mapping[pick]
Expand Down Expand Up @@ -796,30 +828,10 @@ func (cl *Client) waitUnknownTopic(
cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, done retrying, failing all records", "topic", topic, "err", err)

delete(p.unknownTopics, topic)
p.failUnknownTopicRecords(unknown, err)
}

// Called under the unknown mu, this finishes promises for an unknown topic.
//
// We do not delete from the producer's topics due to potential concurrent
// metadata updating issues: if the metadata has an active request loading for
// a topic we are actively deleting now, and that request finally loads the
// topic successfully, it will create recBuf pointers that will not be cleaned
// up.
//
// We could work around this using the same blockingMetadataFn type logic that
// we use when unsetting a consumer, but it's more finnicky for a producer
// because we want to knife out a single topic.
//
// Leaving a topic buffered even if we failed it as unknown should be of no
// consequence because clients should not really be producing to loads of
// unknown topics.
func (p *producer) failUnknownTopicRecords(unknown *unknownTopicProduces, err error) {
go func() {
for _, pr := range unknown.buffered {
p.cl.finishRecordPromise(pr, err)
}
}()
p.promiseBatch(batchPromise{
recs: unknown.buffered,
err: err,
})
}

// Flush hangs waiting for all buffered records to be flushed, stopping all
Expand Down Expand Up @@ -941,11 +953,10 @@ func (cl *Client) failBufferedRecords(err error) {
toFail = append(toFail, unknown.buffered)
}

go func() {
for _, fail := range toFail {
for _, pr := range fail {
cl.finishRecordPromise(pr, err)
}
}
}()
for _, fail := range toFail {
p.promiseBatch(batchPromise{
recs: fail,
err: err,
})
}
}
3 changes: 1 addition & 2 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ type Record struct {
// For producing, this is left unset. This will be set by the client as
// appropriate. If you are producing with no acks, this will just be
// the offset used in the produce request and does not mirror the
// offset actually stored within Kafka. The offset will not be valid
// unless the record was successfully produced.
// offset actually stored within Kafka.
Offset int64
}

Expand Down
Loading

0 comments on commit 31f3f5f

Please sign in to comment.