Skip to content

Commit

Permalink
client: add Buffered{Produce,Fetch}Records
Browse files Browse the repository at this point in the history
These two new methods can help users determine the health of
- for producing, the health of the connection to kafka / the kafka
cluster's ability to handle producing
- for consuming, the health of a user's application

These new methods require a new hook, which is introduced in the next
commit.
  • Loading branch information
twmb committed Jul 15, 2021
1 parent ce269ee commit 3256518
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (o Offset) At(at int64) Offset {
type consumer struct {
cl *Client

bufferedRecords int64

// mu is grabbed when
// - polling fetches, for quickly draining sources / updating group uncommitted
// - calling assignPartitions (group / direct updates)
Expand Down Expand Up @@ -119,6 +121,19 @@ type consumer struct {
fakeReadyForDraining []Fetch
}

// BufferedFetchRecords returns the number of records currently buffered from
// fetching within the client.
//
// This can be used as a gauge to determine how behind your application is for
// processing records the client has fetched. Note that it is perfectly normal
// to see a spike of buffered records, which would correspond to a fetch
// response being processed just before a call to this function. It is only
// problematic if for you if this function is consistently returning large
// values.
func (cl *Client) BufferedFetchRecords() int64 {
return atomic.LoadInt64(&cl.consumer.bufferedRecords)
}

type usedCursors map[*cursor]struct{}

func (u *usedCursors) use(c *cursor) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ type producer struct {
inTxn bool
}

// BufferedProduceRecords returns the number of records currently buffered for
// producing within the client.
//
// This can be used as a gauge to determine how far behind the client is for
// flushing records produced by your client (which can help determine network /
// cluster health).
func (cl *Client) BufferedProduceRecords() int64 {
return atomic.LoadInt64(&cl.producer.bufferedRecords)
}

type unknownTopicProduces struct {
buffered []promisedRec
wait chan error
Expand Down
21 changes: 21 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,21 @@ type bufferedFetch struct {
usedOffsets usedOffsets // what the offsets will be next if this fetch is used
}

func (s *source) hook(f *Fetch, buffered) {
var nrecs int
for i := range f.Topics {
t := &f.Topics[i]
for j := range t.Partitions {
nrecs += len(t.Partitions[j].Records)
}
}
if buffered {
atomic.AddInt64(&s.cl.consumer.bufferedRecords, int64(nrecs))
} else {
atomic.AddInt64(&s.cl.consumer.bufferedRecords, -int64(nrecs))
}
}

// takeBuffered drains a buffered fetch and updates offsets.
func (s *source) takeBuffered() Fetch {
return s.takeBufferedFn(func(usedOffsets usedOffsets) {
Expand Down Expand Up @@ -358,6 +373,8 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
}
}

s.hook(&r, false) // unbuffered

drained := len(bf.Topics) == 0
if drained {
s.takeBuffered()
Expand All @@ -371,6 +388,9 @@ func (s *source) takeBufferedFn(offsetFn func(usedOffsets)) Fetch {
offsetFn(r.usedOffsets)
r.doneFetch <- struct{}{}
close(s.sem)

s.hook(&r.fetch, false) // unbuffered

return r.fetch
}

Expand Down Expand Up @@ -664,6 +684,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
usedOffsets: req.usedOffsets,
}
s.sem = make(chan struct{})
s.hook(&fetch, true) // buffered
s.cl.consumer.addSourceReadyForDraining(s)
}
return
Expand Down

0 comments on commit 3256518

Please sign in to comment.