Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: Refactor ingester metrics from globals into the ingesterMetrics struct #6275

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 18 additions & 95 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
Expand All @@ -19,87 +18,11 @@ import (

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
loki_util "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)

var (
chunkUtilization = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
})
memoryChunks = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "ingester_memory_chunks",
Help: "The total number of chunks in memory.",
})
chunkEntries = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_entries",
Help: "Distribution of stored lines per chunk (when stored).",
Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200
})
chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(20000, 2, 10), // biggest bucket is 20000*2^(10-1) = 10,240,000 (~10.2MB)
})
chunkCompressionRatio = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_compression_ratio",
Help: "Compression ratio of chunks (when stored).",
Buckets: prometheus.LinearBuckets(.75, 2, 10),
})
chunksPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
}, []string{"tenant"})
chunkSizePerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per tenant.",
}, []string{"tenant"})
chunkAge = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
})
chunkEncodeTime = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_encode_time_seconds",
Help: "Distribution of chunk encode times.",
// 10ms to 10s.
Buckets: prometheus.ExponentialBuckets(0.01, 4, 6),
})
chunksFlushedPerReason = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_flushed_total",
Help: "Total flushed chunks per reason.",
}, []string{"reason"})
chunkLifespan = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_bounds_hours",
Help: "Distribution of chunk end-start durations.",
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
})
flushedChunksStats = usagestats.NewCounter("ingester_flushed_chunks")
flushedChunksBytesStats = usagestats.NewStatistics("ingester_flushed_chunks_bytes")
flushedChunksLinesStats = usagestats.NewStatistics("ingester_flushed_chunks_lines")
flushedChunksAgeStats = usagestats.NewStatistics("ingester_flushed_chunks_age_seconds")
flushedChunksLifespanStats = usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds")
flushedChunksUtilizationStats = usagestats.NewStatistics("ingester_flushed_chunks_utilization")
)

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
Expand Down Expand Up @@ -324,7 +247,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe
stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected
stream.chunks = stream.chunks[1:]
}
memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))
i.metrics.memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))

// Signal how much data has been flushed to lessen any WAL replay pressure.
i.replayController.Sub(int64(subtracted))
Expand Down Expand Up @@ -360,8 +283,8 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
labelsBuilder.Set(nameLabel, logsValue)
metric := labelsBuilder.Labels()

sizePerTenant := chunkSizePerTenant.WithLabelValues(userID)
countPerTenant := chunksPerTenant.WithLabelValues(userID)
sizePerTenant := i.metrics.chunkSizePerTenant.WithLabelValues(userID)
countPerTenant := i.metrics.chunksPerTenant.WithLabelValues(userID)

for j, c := range cs {
if err := i.closeChunk(c, chunkMtx); err != nil {
Expand Down Expand Up @@ -430,7 +353,7 @@ func (i *Ingester) encodeChunk(ctx context.Context, ch *chunk.Chunk, desc *chunk
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize))); err != nil {
return fmt.Errorf("chunk encoding: %w", err)
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
i.metrics.chunkEncodeTime.Observe(time.Since(start).Seconds())
return nil
}

Expand All @@ -443,7 +366,7 @@ func (i *Ingester) flushChunk(ctx context.Context, ch *chunk.Chunk) error {
if err := i.store.Put(ctx, []chunk.Chunk{*ch}); err != nil {
return fmt.Errorf("store put chunk: %w", err)
}
flushedChunksStats.Inc(1)
i.metrics.flushedChunksStats.Inc(1)
return nil
}

Expand All @@ -455,30 +378,30 @@ func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc
return
}

chunksFlushedPerReason.WithLabelValues(reason).Add(1)
i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)

compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)

if ok && compressedSize > 0 {
chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}

utilization := ch.Data.Utilization()
chunkUtilization.Observe(utilization)
i.metrics.chunkUtilization.Observe(utilization)
numEntries := desc.chunk.Size()
chunkEntries.Observe(float64(numEntries))
chunkSize.Observe(compressedSize)
i.metrics.chunkEntries.Observe(float64(numEntries))
i.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()

boundsFrom, boundsTo := desc.chunk.Bounds()
chunkAge.Observe(time.Since(boundsFrom).Seconds())
chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

flushedChunksBytesStats.Record(compressedSize)
flushedChunksLinesStats.Record(float64(numEntries))
flushedChunksUtilizationStats.Record(utilization)
flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Hours())
i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds())
i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

i.metrics.flushedChunksBytesStats.Record(compressedSize)
i.metrics.flushedChunksLinesStats.Record(float64(numEntries))
i.metrics.flushedChunksUtilizationStats.Record(utilization)
i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Hours())
}
2 changes: 1 addition & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo

err := s.consumeChunk(ctx, chunk)
if err == nil {
memoryChunks.Inc()
i.metrics.memoryChunks.Inc()
}

return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}, nil)
i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand Down
119 changes: 119 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -35,6 +36,29 @@ type ingesterMetrics struct {
limiterEnabled prometheus.Gauge

autoForgetUnhealthyIngestersTotal prometheus.Counter

chunkUtilization prometheus.Histogram
memoryChunks prometheus.Gauge
chunkEntries prometheus.Histogram
chunkSize prometheus.Histogram
chunkCompressionRatio prometheus.Histogram
chunksPerTenant *prometheus.CounterVec
chunkSizePerTenant *prometheus.CounterVec
chunkAge prometheus.Histogram
chunkEncodeTime prometheus.Histogram
chunksFlushedPerReason *prometheus.CounterVec
chunkLifespan prometheus.Histogram
flushedChunksStats *usagestats.Counter
flushedChunksBytesStats *usagestats.Statistics
flushedChunksLinesStats *usagestats.Statistics
flushedChunksAgeStats *usagestats.Statistics
flushedChunksLifespanStats *usagestats.Statistics
flushedChunksUtilizationStats *usagestats.Statistics

chunksCreatedTotal prometheus.Counter
samplesPerChunk prometheus.Histogram
blocksPerChunk prometheus.Histogram
chunkCreatedStats *usagestats.Counter
}

// setRecoveryBytesInUse bounds the bytes reports to >= 0.
Expand Down Expand Up @@ -148,5 +172,100 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Name: "loki_ingester_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten",
}),
chunkUtilization: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
}),
memoryChunks: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "ingester_memory_chunks",
Help: "The total number of chunks in memory.",
}),
chunkEntries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_entries",
Help: "Distribution of stored lines per chunk (when stored).",
Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200
}),
chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(20000, 2, 10), // biggest bucket is 20000*2^(10-1) = 10,240,000 (~10.2MB)
}),
chunkCompressionRatio: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_compression_ratio",
Help: "Compression ratio of chunks (when stored).",
Buckets: prometheus.LinearBuckets(.75, 2, 10),
}),
chunksPerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
}, []string{"tenant"}),
chunkSizePerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per tenant.",
}, []string{"tenant"}),
chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
}),
chunkEncodeTime: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_encode_time_seconds",
Help: "Distribution of chunk encode times.",
// 10ms to 10s.
Buckets: prometheus.ExponentialBuckets(0.01, 4, 6),
}),
chunksFlushedPerReason: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_flushed_total",
Help: "Total flushed chunks per reason.",
}, []string{"reason"}),
chunkLifespan: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_bounds_hours",
Help: "Distribution of chunk end-start durations.",
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
}),
flushedChunksStats: usagestats.NewCounter("ingester_flushed_chunks"),
flushedChunksBytesStats: usagestats.NewStatistics("ingester_flushed_chunks_bytes"),
flushedChunksLinesStats: usagestats.NewStatistics("ingester_flushed_chunks_lines"),
flushedChunksAgeStats: usagestats.NewStatistics("ingester_flushed_chunks_age_seconds"),
flushedChunksLifespanStats: usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds"),
flushedChunksUtilizationStats: usagestats.NewStatistics("ingester_flushed_chunks_utilization"),
chunksCreatedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_created_total",
Help: "The total number of chunks created in the ingester.",
}),
samplesPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "ingester",
Name: "samples_per_chunk",
Help: "The number of samples in a chunk.",

Buckets: prometheus.LinearBuckets(4096, 2048, 6),
}),
blocksPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "ingester",
Name: "blocks_per_chunk",
Help: "The number of blocks in a chunk.",

Buckets: prometheus.ExponentialBuckets(5, 2, 6),
}),

chunkCreatedStats: usagestats.NewCounter("ingester_chunk_created"),
}
}
2 changes: 1 addition & 1 deletion pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *ingesterRecoverer) Series(series *Series) error {
if err != nil {
return err
}
memoryChunks.Add(float64(len(series.Chunks)))
r.ing.metrics.memoryChunks.Add(float64(len(series.Chunks)))
r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks)))
r.ing.metrics.recoveredEntriesTotal.Add(float64(entriesAdded))
r.ing.replayController.Add(int64(bytesAdded))
Expand Down
Loading