diff --git a/plugin/kprom/README.md b/plugin/kprom/README.md index e9fe552c..ecce729c 100644 --- a/plugin/kprom/README.md +++ b/plugin/kprom/README.md @@ -18,9 +18,12 @@ metrics being counter vecs: #{ns}_read_bytes_total{node_id="#{node}"} #{ns}_produce_bytes_total{node_id="#{node}",topic="#{topic}"} #{ns}_fetch_bytes_total{node_id="#{node}",topic="#{topic}"} +#{ns}_buffered_produce_records_total +#{ns}_buffered_fetch_records_total ``` -Note that seed brokers use broker IDs starting at math.MinInt32. +Note that seed brokers use broker IDs prefixed with "seed_", with the number +corresponding to which seed it is. To use, diff --git a/plugin/kprom/kprom.go b/plugin/kprom/kprom.go index 3140d067..251f0e3c 100644 --- a/plugin/kprom/kprom.go +++ b/plugin/kprom/kprom.go @@ -11,6 +11,8 @@ // #{ns}_read_bytes_total{node_id="#{node}"} // #{ns}_produce_bytes_total{node_id="#{node}",topic="#{topic}"} // #{ns}_fetch_bytes_total{node_id="#{node}",topic="#{topic}"} +// #{ns}_buffered_produce_records_total +// #{ns}_buffered_fetch_records_total // // This can be used in a client like so: // @@ -23,10 +25,12 @@ // By default, metrics are installed under the a new prometheus registry, but // this can be overridden with the Registry option. // -// Note that seed brokers use broker IDs starting at math.MinInt32. +// Note that seed brokers use broker IDs prefixed with "seed_", with the number +// corresponding to which seed it is. package kprom import ( + "math" "net" "net/http" "strconv" @@ -64,6 +68,9 @@ type Metrics struct { produceBytes *prometheus.CounterVec fetchBytes *prometheus.CounterVec + + bufferedProduceRecords int64 + bufferedFetchRecords int64 } // Registry returns the prometheus registry that metrics were added to. @@ -80,6 +87,8 @@ func (m *Metrics) Handler() http.Handler { } type cfg struct { + namespace string + reg prometheus.Registerer gatherer prometheus.Gatherer @@ -141,8 +150,9 @@ func HandlerOpts(opts promhttp.HandlerOpts) Opt { func NewMetrics(namespace string, opts ...Opt) *Metrics { var regGatherer RegistererGatherer = prometheus.NewRegistry() cfg := cfg{ - reg: regGatherer, - gatherer: regGatherer, + namespace: namespace, + reg: regGatherer, + gatherer: regGatherer, } for _, opt := range opts { opt.apply(&cfg) @@ -222,8 +232,31 @@ func NewMetrics(namespace string, opts ...Opt) *Metrics { } } +func (m *Metrics) OnNewClient(cl *kgo.Client) { + factory := promauto.With(m.cfg.reg) + + factory.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.namespace, + Name: "buffered_produce_records_total", + Help: "Total number of records buffered within the client ready to be produced.", + }, func() float64 { return float64(cl.BufferedProduceRecords()) }) + + factory.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.namespace, + Name: "buffered_fetch_records_total", + Help: "Total number of records buffered within the client ready to be consumed.", + }, func() float64 { return float64(cl.BufferedFetchRecords()) }) +} + +func strnode(node int32) string { + if node < 0 { + return "seed_" + strconv.Itoa(int(node)-math.MinInt32) + } + return strconv.Itoa(int(node)) +} + func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { - node := strconv.Itoa(int(meta.NodeID)) + node := strnode(meta.NodeID) if err != nil { m.connectErrs.WithLabelValues(node).Inc() return @@ -232,12 +265,12 @@ func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ ne } func (m *Metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { - node := strconv.Itoa(int(meta.NodeID)) + node := strnode(meta.NodeID) m.disconnects.WithLabelValues(node).Inc() } func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, _, _ time.Duration, err error) { - node := strconv.Itoa(int(meta.NodeID)) + node := strnode(meta.NodeID) if err != nil { m.writeErrs.WithLabelValues(node).Inc() return @@ -246,7 +279,7 @@ func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten i } func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, _, _ time.Duration, err error) { - node := strconv.Itoa(int(meta.NodeID)) + node := strnode(meta.NodeID) if err != nil { m.readErrs.WithLabelValues(node).Inc() return @@ -255,11 +288,11 @@ func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, } func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, pbm kgo.ProduceBatchMetrics) { - node := strconv.Itoa(int(meta.NodeID)) + node := strnode(meta.NodeID) m.produceBytes.WithLabelValues(node, topic).Add(float64(pbm.UncompressedBytes)) } func (m *Metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, fbm kgo.FetchBatchMetrics) { - node := strconv.Itoa(int(meta.NodeID)) + node := strnode(meta.NodeID) m.fetchBytes.WithLabelValues(node, topic).Add(float64(fbm.UncompressedBytes)) }