Skip to content

Commit

Permalink
Added last read/wrote timestamp metrics to loki.write (#5699)
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi authored Nov 7, 2023
1 parent 1dbb46c commit b9787dc
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 48 deletions.
29 changes: 10 additions & 19 deletions component/common/loki/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/util"
lokiutil "github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -116,30 +117,20 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
}

if reg != nil {
m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)
m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
m.mutatedEntries = mustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec)
m.mutatedBytes = mustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec)
m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
m.encodedBytes = util.MustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)
m.sentBytes = util.MustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentEntries = util.MustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
m.droppedEntries = util.MustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
m.mutatedEntries = util.MustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec)
m.mutatedBytes = util.MustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec)
m.requestDuration = util.MustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
m.batchRetries = util.MustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
}

return &m
}

func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
if err := reg.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector
}
panic(err)
}
return c
}

// Client pushes entries to Loki and can be stopped
type Client interface {
loki.EntryHandler
Expand Down
3 changes: 2 additions & 1 deletion component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr

walWatcherMetrics := wal.NewWatcherMetrics(reg)
walMarkerMetrics := internal.NewMarkerMetrics(reg)
queueClientMetrics := NewQueueClientMetrics(reg)

if len(clientCfgs) == 0 {
return nil, fmt.Errorf("at least one client config must be provided")
Expand Down Expand Up @@ -98,7 +99,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
}
markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName))

queue, err := NewQueue(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler)
queue, err := NewQueue(metrics, queueClientMetrics.CurryWithId(clientName), cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler)
if err != nil {
return nil, fmt.Errorf("error starting queue client: %w", err)
}
Expand Down
37 changes: 37 additions & 0 deletions component/common/loki/client/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package client

import (
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

type QueueClientMetrics struct {
lastReadTimestamp *prometheus.GaugeVec
}

func NewQueueClientMetrics(reg prometheus.Registerer) *QueueClientMetrics {
m := &QueueClientMetrics{
lastReadTimestamp: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "loki_write",
Name: "last_read_timestamp",
Help: "Latest timestamp read from the WAL",
},
[]string{"id"},
),
}

if reg != nil {
m.lastReadTimestamp = util.MustRegisterOrGet(reg, m.lastReadTimestamp).(*prometheus.GaugeVec)
}

return m
}

func (m *QueueClientMetrics) CurryWithId(id string) *QueueClientMetrics {
return &QueueClientMetrics{
lastReadTimestamp: m.lastReadTimestamp.MustCurryWith(map[string]string{
"id": id,
}),
}
}
25 changes: 18 additions & 7 deletions component/common/loki/client/queue_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ func (q *queue) closeNow() {
// which allows it to be injected in the wal.Watcher as a destination where to write read series and entries. As the watcher
// reads from the WAL, batches are created and dispatched onto a send queue when ready to be sent.
type queueClient struct {
metrics *Metrics
logger log.Logger
cfg Config
client *http.Client
metrics *Metrics
qcMetrics *QueueClientMetrics
logger log.Logger
cfg Config
client *http.Client

batches map[string]*batch
batchesMtx sync.Mutex
Expand All @@ -180,14 +181,14 @@ type queueClient struct {
}

// NewQueue creates a new queueClient.
func NewQueue(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) {
func NewQueue(metrics *Metrics, queueClientMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) {
if cfg.StreamLagLabels.String() != "" {
return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String())
}
return newQueueClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler)
return newQueueClient(metrics, queueClientMetrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler)
}

func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) {
func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) {
if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
}
Expand All @@ -198,6 +199,7 @@ func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, m
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
metrics: metrics,
qcMetrics: qcMetrics,
drainTimeout: cfg.Queue.DrainTimeout,
quit: make(chan struct{}),

Expand Down Expand Up @@ -283,16 +285,25 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error {
c.seriesLock.RLock()
l, ok := c.series[entries.Ref]
c.seriesLock.RUnlock()
var maxSeenTimestamp int64 = -1
if ok {
for _, e := range entries.Entries {
c.appendSingleEntry(segment, l, e)
if e.Timestamp.Unix() > maxSeenTimestamp {
maxSeenTimestamp = e.Timestamp.Unix()
}
}
// count all enqueued appended entries as received from WAL
c.markerHandler.UpdateReceivedData(segment, len(entries.Entries))
} else {
// TODO(thepalbi): Add metric here
level.Debug(c.logger).Log("msg", "series for entry not found")
}

// It's safe to assume that upon an AppendEntries call, there will always be at least
// one entry.
c.qcMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp))

return nil
}

Expand Down
6 changes: 2 additions & 4 deletions component/common/loki/client/queue_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func TestQueueClient(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stdout)

m := NewMetrics(reg)
qc, err := NewQueue(m, cfg, 0, 0, false, logger, nilMarkerHandler{})
qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, nilMarkerHandler{})
require.NoError(t, err)

//labels := model.LabelSet{"app": "test"}
Expand Down Expand Up @@ -281,8 +280,7 @@ func runQueueClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin

logger := log.NewLogfmtLogger(os.Stdout)

m := NewMetrics(reg)
qc, err := NewQueue(m, cfg, 0, 0, false, logger, mhFactory(b))
qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, mhFactory(b))
require.NoError(b, err)

//labels := model.LabelSet{"app": "test"}
Expand Down
27 changes: 10 additions & 17 deletions component/common/loki/wal/watcher_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package wal

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

type WatcherMetrics struct {
recordsRead *prometheus.CounterVec
Expand Down Expand Up @@ -80,23 +83,13 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
}

if reg != nil {
m.recordsRead = mustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec)
m.recordDecodeFails = mustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec)
m.droppedWriteNotifications = mustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec)
m.segmentRead = mustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec)
m.currentSegment = mustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec)
m.watchersRunning = mustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec)
m.recordsRead = util.MustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec)
m.recordDecodeFails = util.MustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec)
m.droppedWriteNotifications = util.MustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec)
m.segmentRead = util.MustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec)
m.currentSegment = util.MustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec)
m.watchersRunning = util.MustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec)
}

return m
}

func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
if err := reg.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector
}
panic(err)
}
return c
}
11 changes: 11 additions & 0 deletions component/common/loki/wal/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Writer struct {

reclaimedOldSegmentsSpaceCounter *prometheus.CounterVec
lastReclaimedSegment *prometheus.GaugeVec
lastWrittenTimestamp *prometheus.GaugeVec

closeCleaner chan struct{}
}
Expand Down Expand Up @@ -96,10 +97,17 @@ func NewWriter(walCfg Config, logger log.Logger, reg prometheus.Registerer) (*Wr
Name: "last_reclaimed_segment",
Help: "Last reclaimed segment number",
}, []string{})
wrt.lastWrittenTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "loki_write",
Subsystem: "wal_writer",
Name: "last_written_timestamp",
Help: "Latest timestamp that was written to the WAL",
}, []string{})

if reg != nil {
_ = reg.Register(wrt.reclaimedOldSegmentsSpaceCounter)
_ = reg.Register(wrt.lastReclaimedSegment)
_ = reg.Register(wrt.lastWrittenTimestamp)
}

wrt.start(walCfg.MaxSegmentAge)
Expand All @@ -118,6 +126,9 @@ func (wrt *Writer) start(maxSegmentAge time.Duration) {
continue
}

// emit metric with latest written timestamp, to be able to track delay from writer to watcher
wrt.lastWrittenTimestamp.WithLabelValues().Set(float64(e.Timestamp.Unix()))

wrt.writeSubscribersLock.RLock()
for _, s := range wrt.writeSubscribers {
s.NotifyWrite()
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package util

import "github.com/prometheus/client_golang/prometheus"

// MustRegisterOrGet will attempt to register the supplied collector into the register. If it's already
// registered, it will return that one.
// In case that the register procedure fails with something other than already registered, this will panic.
func MustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
if err := reg.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector
}
panic(err)
}
return c
}

0 comments on commit b9787dc

Please sign in to comment.