diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 322e9f10..1ee9497b 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -298,6 +298,8 @@ func (cl *Client) OptValues(opt any) []any { return []any{cfg.maxRecordBatchBytes} case namefn(MaxBufferedRecords): return []any{cfg.maxBufferedRecords} + case namefn(MaxBufferedBytes): + return []any{cfg.maxBufferedBytes} case namefn(RecordPartitioner): return []any{cfg.partitioner} case namefn(ProduceRequestTimeout): diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 1281628f..dd7a33e2 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -114,6 +114,7 @@ type cfg struct { defaultProduceTopic string maxRecordBatchBytes int32 maxBufferedRecords int64 + maxBufferedBytes int64 produceTimeout time.Duration recordRetries int64 maxUnknownFailures int64 @@ -293,6 +294,7 @@ func (cfg *cfg) validate() error { // Some random producer settings. {name: "max buffered records", v: cfg.maxBufferedRecords, allowed: 1, badcmp: i64lt}, + {name: "max buffered bytes", v: cfg.maxBufferedBytes, allowed: 0, badcmp: i64lt}, {name: "linger", v: int64(cfg.linger), allowed: int64(time.Minute), badcmp: i64gt, durs: true}, {name: "produce timeout", v: int64(cfg.produceTimeout), allowed: int64(100 * time.Millisecond), badcmp: i64lt, durs: true}, {name: "record timeout", v: int64(cfg.recordTimeout), allowed: int64(time.Second), badcmp: func(l, r int64) (bool, string) { @@ -948,6 +950,23 @@ func MaxBufferedRecords(n int) ProducerOpt { return producerOpt{func(cfg *cfg) { cfg.maxBufferedRecords = int64(n) }} } +// MaxBufferedBytes sets the max amount of bytes that the client will buffer +// while producing, blocking produces until records are finished if this limit +// is reached. This overrides the unlimited default. +// +// Note that this option does _not_ apply for consuming: the client cannot +// limit bytes buffered for consuming because of decompression. You can roughly +// control consuming memory by using [MaxConcurrentFetches], [FetchMaxBytes], +// and [FetchMaxPartitionBytes]. +// +// If you produce a record that is larger than n, the record is immediately +// failed with kerr.MessageTooLarge. +// +// Note that this limit applies after [MaxBufferedRecords]. +func MaxBufferedBytes(n int) ProducerOpt { + return producerOpt{func(cfg *cfg) { cfg.maxBufferedBytes = int64(n) }} +} + // RecordPartitioner uses the given partitioner to partition records, overriding // the default UniformBytesPartitioner(64KiB, true, true, nil). func RecordPartitioner(partitioner Partitioner) ProducerOpt { @@ -1047,8 +1066,8 @@ func ProducerLinger(linger time.Duration) ProducerOpt { // ManualFlushing disables auto-flushing when producing. While you can still // set lingering, it would be useless to do so. // -// With manual flushing, producing while MaxBufferedRecords have already been -// produced and not flushed will return ErrMaxBuffered. +// With manual flushing, producing while MaxBufferedRecords or MaxBufferedBytes +// have already been produced and not flushed will return ErrMaxBuffered. func ManualFlushing() ProducerOpt { return producerOpt{func(cfg *cfg) { cfg.manualFlushing = true }} } diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 2f712285..6e06da23 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -39,6 +39,7 @@ func TestGroupETL(t *testing.T) { getSeedBrokers(), WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), MaxBufferedRecords(10000), + MaxBufferedBytes(50000), UnknownTopicRetries(-1), // see txn_test comment ) defer cl.Close() @@ -124,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { ConsumeTopics(c.consumeFrom), Balancers(c.balancer), MaxBufferedRecords(10000), + MaxBufferedBytes(50000), ConsumePreferringLagFn(PreferLagAt(1)), BlockRebalanceOnPoll(), diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 3805d192..2ceb171b 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -15,6 +15,7 @@ import ( type producer struct { bufferedRecords atomicI64 + bufferedBytes atomicI64 inflight atomicI64 // high 16: # waiters, low 48: # inflight cl *Client @@ -313,8 +314,9 @@ func (f *FirstErrPromise) Err() error { } // TryProduce is similar to Produce, but rather than blocking if the client -// currently has MaxBufferedRecords buffered, this fails immediately with -// ErrMaxBuffered. See the Produce documentation for more details. +// currently has MaxBufferedRecords or MaxBufferedBytes buffered, this fails +// immediately with ErrMaxBuffered. See the Produce documentation for more +// details. func (cl *Client) TryProduce( ctx context.Context, r *Record, @@ -387,6 +389,21 @@ func (cl *Client) produce( } } + var ( + userSize = r.userSize() + bufRecs = p.bufferedRecords.Add(1) + bufBytes = p.bufferedBytes.Add(userSize) + overMaxRecs = bufRecs > cl.cfg.maxBufferedRecords + overMaxBytes bool + ) + if cl.cfg.maxBufferedBytes > 0 { + if userSize > cl.cfg.maxBufferedBytes { + p.promiseRecord(promisedRec{ctx, promise, r}, kerr.MessageTooLarge) + return + } + overMaxBytes = bufBytes > cl.cfg.maxBufferedBytes + } + if r.Topic == "" { p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic) return @@ -396,7 +413,11 @@ func (cl *Client) produce( return } - if p.bufferedRecords.Add(1) > cl.cfg.maxBufferedRecords { + if overMaxRecs || overMaxBytes { + cl.cfg.logger.Log(LogLevelDebug, "blocking Produce because we are either over max buffered records or max buffered bytes", + "over_max_records", overMaxRecs, + "over_max_bytes", overMaxBytes, + ) // If the client ctx cancels or the produce ctx cancels, we // need to un-count our buffering of this record. We also need // to drain a slot from the waitBuffer chan, which could be @@ -411,11 +432,14 @@ func (cl *Client) produce( } select { case <-p.waitBuffer: + cl.cfg.logger.Log(LogLevelDebug, "Produce block signaled, continuing to produce") case <-cl.ctx.Done(): drainBuffered(ErrClientClosed) + cl.cfg.logger.Log(LogLevelDebug, "client ctx canceled while blocked in Produce, returning") return case <-ctx.Done(): drainBuffered(ctx.Err()) + cl.cfg.logger.Log(LogLevelDebug, "produce ctx canceled while blocked in Produce, returning") return } } @@ -478,15 +502,21 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) { } } + // Capture user size before potential modification by the promise. + userSize := pr.userSize() + nowBufBytes := p.bufferedBytes.Add(-userSize) + nowBufRecs := p.bufferedRecords.Add(-1) + wasOverMaxRecs := nowBufRecs >= cl.cfg.maxBufferedRecords + wasOverMaxBytes := cl.cfg.maxBufferedBytes > 0 && nowBufBytes+userSize > cl.cfg.maxBufferedBytes + // We call the promise before finishing the record; this allows users // of Flush to know that all buffered records are completely done // before Flush returns. pr.promise(pr.Record, err) - buffered := p.bufferedRecords.Add(-1) - if buffered >= cl.cfg.maxBufferedRecords { + if wasOverMaxRecs || wasOverMaxBytes { p.waitBuffer <- struct{}{} - } else if buffered == 0 && p.flushing.Load() > 0 { + } else if nowBufRecs == 0 && p.flushing.Load() > 0 { p.mu.Lock() p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe. p.c.Broadcast() diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 7edff28b..4f1ebe6f 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -151,6 +151,14 @@ type Record struct { Context context.Context } +func (r *Record) userSize() int64 { + s := len(r.Key) + len(r.Value) + for _, h := range r.Headers { + s += len(h.Key) + len(h.Value) + } + return int64(s) +} + // When buffering records, we calculate the length and tsDelta ahead of time // (also because number width affects encoding length). We repurpose the Offset // field to save space.