Skip to content

Commit

Permalink
kprom: clarify seed ids, add two new metrics
Browse files Browse the repository at this point in the history
Seed IDs previously used largely negative numbers. This changes that to
more clearly indicate what a seed is.

This also adds the two new buffered fetch / produce records metrics,
which requires using the new HookNewClient.
  • Loading branch information
twmb committed Jul 15, 2021
1 parent ffc94ea commit 1bb70a5
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 10 deletions.
5 changes: 4 additions & 1 deletion plugin/kprom/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ metrics being counter vecs:
#{ns}_read_bytes_total{node_id="#{node}"}
#{ns}_produce_bytes_total{node_id="#{node}",topic="#{topic}"}
#{ns}_fetch_bytes_total{node_id="#{node}",topic="#{topic}"}
#{ns}_buffered_produce_records_total
#{ns}_buffered_fetch_records_total
```

Note that seed brokers use broker IDs starting at math.MinInt32.
Note that seed brokers use broker IDs prefixed with "seed_", with the number
corresponding to which seed it is.

To use,

Expand Down
51 changes: 42 additions & 9 deletions plugin/kprom/kprom.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// #{ns}_read_bytes_total{node_id="#{node}"}
// #{ns}_produce_bytes_total{node_id="#{node}",topic="#{topic}"}
// #{ns}_fetch_bytes_total{node_id="#{node}",topic="#{topic}"}
// #{ns}_buffered_produce_records_total
// #{ns}_buffered_fetch_records_total
//
// This can be used in a client like so:
//
Expand All @@ -23,10 +25,12 @@
// By default, metrics are installed under the a new prometheus registry, but
// this can be overridden with the Registry option.
//
// Note that seed brokers use broker IDs starting at math.MinInt32.
// Note that seed brokers use broker IDs prefixed with "seed_", with the number
// corresponding to which seed it is.
package kprom

import (
"math"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -64,6 +68,9 @@ type Metrics struct {

produceBytes *prometheus.CounterVec
fetchBytes *prometheus.CounterVec

bufferedProduceRecords int64
bufferedFetchRecords int64
}

// Registry returns the prometheus registry that metrics were added to.
Expand All @@ -80,6 +87,8 @@ func (m *Metrics) Handler() http.Handler {
}

type cfg struct {
namespace string

reg prometheus.Registerer
gatherer prometheus.Gatherer

Expand Down Expand Up @@ -141,8 +150,9 @@ func HandlerOpts(opts promhttp.HandlerOpts) Opt {
func NewMetrics(namespace string, opts ...Opt) *Metrics {
var regGatherer RegistererGatherer = prometheus.NewRegistry()
cfg := cfg{
reg: regGatherer,
gatherer: regGatherer,
namespace: namespace,
reg: regGatherer,
gatherer: regGatherer,
}
for _, opt := range opts {
opt.apply(&cfg)
Expand Down Expand Up @@ -222,8 +232,31 @@ func NewMetrics(namespace string, opts ...Opt) *Metrics {
}
}

func (m *Metrics) OnNewClient(cl *kgo.Client) {
factory := promauto.With(m.cfg.reg)

factory.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.namespace,
Name: "buffered_produce_records_total",
Help: "Total number of records buffered within the client ready to be produced.",
}, func() float64 { return float64(cl.BufferedProduceRecords()) })

factory.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.namespace,
Name: "buffered_fetch_records_total",
Help: "Total number of records buffered within the client ready to be consumed.",
}, func() float64 { return float64(cl.BufferedFetchRecords()) })
}

func strnode(node int32) string {
if node < 0 {
return "seed_" + strconv.Itoa(int(node)-math.MinInt32)
}
return strconv.Itoa(int(node))
}

func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strconv.Itoa(int(meta.NodeID))
node := strnode(meta.NodeID)
if err != nil {
m.connectErrs.WithLabelValues(node).Inc()
return
Expand All @@ -232,12 +265,12 @@ func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ ne
}

func (m *Metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
node := strconv.Itoa(int(meta.NodeID))
node := strnode(meta.NodeID)
m.disconnects.WithLabelValues(node).Inc()
}

func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, _, _ time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
node := strnode(meta.NodeID)
if err != nil {
m.writeErrs.WithLabelValues(node).Inc()
return
Expand All @@ -246,7 +279,7 @@ func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten i
}

func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, _, _ time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
node := strnode(meta.NodeID)
if err != nil {
m.readErrs.WithLabelValues(node).Inc()
return
Expand All @@ -255,11 +288,11 @@ func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int,
}

func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, pbm kgo.ProduceBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
node := strnode(meta.NodeID)
m.produceBytes.WithLabelValues(node, topic).Add(float64(pbm.UncompressedBytes))
}

func (m *Metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, fbm kgo.FetchBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
node := strnode(meta.NodeID)
m.fetchBytes.WithLabelValues(node, topic).Add(float64(fbm.UncompressedBytes))
}

0 comments on commit 1bb70a5

Please sign in to comment.