diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index ba7ba988..32dfada9 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -82,6 +82,8 @@ func (o Offset) At(at int64) Offset { type consumer struct { cl *Client + bufferedRecords int64 + // mu is grabbed when // - polling fetches, for quickly draining sources / updating group uncommitted // - calling assignPartitions (group / direct updates) @@ -119,6 +121,19 @@ type consumer struct { fakeReadyForDraining []Fetch } +// BufferedFetchRecords returns the number of records currently buffered from +// fetching within the client. +// +// This can be used as a gauge to determine how behind your application is for +// processing records the client has fetched. Note that it is perfectly normal +// to see a spike of buffered records, which would correspond to a fetch +// response being processed just before a call to this function. It is only +// problematic if for you if this function is consistently returning large +// values. +func (cl *Client) BufferedFetchRecords() int64 { + return atomic.LoadInt64(&cl.consumer.bufferedRecords) +} + type usedCursors map[*cursor]struct{} func (u *usedCursors) use(c *cursor) { diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 2368db73..b4a9a748 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -47,6 +47,16 @@ type producer struct { inTxn bool } +// BufferedProduceRecords returns the number of records currently buffered for +// producing within the client. +// +// This can be used as a gauge to determine how far behind the client is for +// flushing records produced by your client (which can help determine network / +// cluster health). +func (cl *Client) BufferedProduceRecords() int64 { + return atomic.LoadInt64(&cl.producer.bufferedRecords) +} + type unknownTopicProduces struct { buffered []promisedRec wait chan error diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index cb0ba9a9..3acaf85d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -279,6 +279,21 @@ type bufferedFetch struct { usedOffsets usedOffsets // what the offsets will be next if this fetch is used } +func (s *source) hook(f *Fetch, buffered) { + var nrecs int + for i := range f.Topics { + t := &f.Topics[i] + for j := range t.Partitions { + nrecs += len(t.Partitions[j].Records) + } + } + if buffered { + atomic.AddInt64(&s.cl.consumer.bufferedRecords, int64(nrecs)) + } else { + atomic.AddInt64(&s.cl.consumer.bufferedRecords, -int64(nrecs)) + } +} + // takeBuffered drains a buffered fetch and updates offsets. func (s *source) takeBuffered() Fetch { return s.takeBufferedFn(func(usedOffsets usedOffsets) { @@ -358,6 +373,8 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { } } + s.hook(&r, false) // unbuffered + drained := len(bf.Topics) == 0 if drained { s.takeBuffered() @@ -371,6 +388,9 @@ func (s *source) takeBufferedFn(offsetFn func(usedOffsets)) Fetch { offsetFn(r.usedOffsets) r.doneFetch <- struct{}{} close(s.sem) + + s.hook(&r.fetch, false) // unbuffered + return r.fetch } @@ -664,6 +684,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct usedOffsets: req.usedOffsets, } s.sem = make(chan struct{}) + s.hook(&fetch, true) // buffered s.cl.consumer.addSourceReadyForDraining(s) } return