Skip to content

Commit

Permalink
feat: instrument failed chunk encoding/decoding (#13684)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Jul 26, 2024
1 parent 5d5e3cc commit 5a87ccb
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 13 deletions.
13 changes: 9 additions & 4 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"bytes"
"errors"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (i *Ingester) Flush() {
}

// TransferOut implements ring.FlushTransferer
// Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more.
// Noop implementation because ingesters have a WAL now that does not require transferring chunks any more.
// We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.
func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
Expand Down Expand Up @@ -179,7 +180,6 @@ func (i *Ingester) flushLoop(j int) {

m := util_log.WithUserID(op.userID, l)
err := i.flushOp(m, op)

if err != nil {
level.Error(m).Log("msg", "failed to flush", "err", err)
}
Expand Down Expand Up @@ -410,10 +410,15 @@ func (i *Ingester) encodeChunk(ctx context.Context, ch *chunk.Chunk, desc *chunk
}
start := time.Now()
chunkBytesSize := desc.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize))); err != nil {
return fmt.Errorf("chunk encoding: %w", err)
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize)), i.logger); err != nil {
if !errors.Is(err, chunk.ErrChunkDecode) {
return fmt.Errorf("chunk encoding: %w", err)
}

i.metrics.chunkDecodeFailures.WithLabelValues(ch.UserID).Inc()
}
i.metrics.chunkEncodeTime.Observe(time.Since(start).Seconds())
i.metrics.chunksEncoded.WithLabelValues(ch.UserID).Inc()
return nil
}

Expand Down
30 changes: 24 additions & 6 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type ingesterMetrics struct {
chunksFlushFailures prometheus.Counter
chunksFlushedPerReason *prometheus.CounterVec
chunkLifespan prometheus.Histogram
chunksEncoded *prometheus.CounterVec
chunkDecodeFailures *prometheus.CounterVec
flushedChunksStats *analytics.Counter
flushedChunksBytesStats *analytics.Statistics
flushedChunksLinesStats *analytics.Statistics
Expand Down Expand Up @@ -252,12 +254,28 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
}),
flushedChunksStats: analytics.NewCounter("ingester_flushed_chunks"),
flushedChunksBytesStats: analytics.NewStatistics("ingester_flushed_chunks_bytes"),
flushedChunksLinesStats: analytics.NewStatistics("ingester_flushed_chunks_lines"),
flushedChunksAgeStats: analytics.NewStatistics("ingester_flushed_chunks_age_seconds"),
flushedChunksLifespanStats: analytics.NewStatistics("ingester_flushed_chunks_lifespan_seconds"),
flushedChunksUtilizationStats: analytics.NewStatistics("ingester_flushed_chunks_utilization"),
chunksEncoded: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_chunks_encoded_total",
Help: "The total number of chunks encoded in the ingester.",
}, []string{"user"}),
chunkDecodeFailures: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_chunk_decode_failures_total",
Help: "The number of freshly encoded chunks that failed to decode.",
}, []string{"user"}),
flushedChunksStats: analytics.NewCounter("ingester_flushed_chunks"),
flushedChunksBytesStats: analytics.NewStatistics("ingester_flushed_chunks_bytes"),
flushedChunksLinesStats: analytics.NewStatistics("ingester_flushed_chunks_lines"),
flushedChunksAgeStats: analytics.NewStatistics(
"ingester_flushed_chunks_age_seconds",
),
flushedChunksLifespanStats: analytics.NewStatistics(
"ingester_flushed_chunks_lifespan_seconds",
),
flushedChunksUtilizationStats: analytics.NewStatistics(
"ingester_flushed_chunks_utilization",
),
chunksCreatedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_chunks_created_total",
Expand Down
35 changes: 33 additions & 2 deletions pkg/storage/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chunk
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"reflect"
"strconv"
Expand All @@ -12,13 +13,16 @@ import (

errs "errors"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var (
Expand All @@ -27,6 +31,7 @@ var (
ErrMetadataLength = errs.New("chunk metadata wrong length")
ErrDataLength = errs.New("chunk data wrong length")
ErrSliceOutOfRange = errs.New("chunk can't be sliced out of its data range")
ErrChunkDecode = errs.New("error decoding freshly created chunk")
)

var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
Expand Down Expand Up @@ -227,11 +232,11 @@ var writerPool = sync.Pool{

// Encode writes the chunk into a buffer, and calculates the checksum.
func (c *Chunk) Encode() error {
return c.EncodeTo(nil)
return c.EncodeTo(nil, util_log.Logger)
}

// EncodeTo is like Encode but you can provide your own buffer to use.
func (c *Chunk) EncodeTo(buf *bytes.Buffer) error {
func (c *Chunk) EncodeTo(buf *bytes.Buffer, log log.Logger) error {
if buf == nil {
buf = bytes.NewBuffer(nil)
}
Expand Down Expand Up @@ -275,6 +280,32 @@ func (c *Chunk) EncodeTo(buf *bytes.Buffer) error {
// Now work out the checksum
c.encoded = buf.Bytes()
c.Checksum = crc32.Checksum(c.encoded, castagnoliTable)

newCh := Chunk{
ChunkRef: logproto.ChunkRef{
UserID: c.UserID,
Fingerprint: c.Fingerprint,
From: c.From,
Through: c.Through,
Checksum: c.Checksum,
},
}

if err := newCh.Decode(NewDecodeContext(), c.encoded); err != nil {
externalKey := fmt.Sprintf(
"%s/%x/%x:%x:%x",
c.UserID,
c.Fingerprint,
int64(c.From),
int64(c.Through),
c.Checksum,
)
level.Error(log).
Log("msg", "error decoding freshly created chunk", "err", err, "key", externalKey)

return ErrChunkDecode
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/chunk/client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ChunkClientMetrics struct {
chunksSizePutPerUser *prometheus.CounterVec
chunksFetchedPerUser *prometheus.CounterVec
chunksSizeFetchedPerUser *prometheus.CounterVec
chunkDecodeFailures *prometheus.CounterVec
}

func NewChunkClientMetrics(reg prometheus.Registerer) ChunkClientMetrics {
Expand All @@ -53,6 +54,11 @@ func NewChunkClientMetrics(reg prometheus.Registerer) ChunkClientMetrics {
Name: "chunk_store_fetched_chunk_bytes_total",
Help: "Total bytes fetched in chunks per user.",
}, []string{"user"}),
chunkDecodeFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "chunk_store_decode_failures_total",
Help: "Total chunk decoding failures.",
}, []string{"user"}),
}
}

Expand Down Expand Up @@ -85,6 +91,17 @@ func (c MetricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk)
func (c MetricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
chks, err := c.Client.GetChunks(ctx, chunks)
if err != nil {
// Get chunks fetches chunks in parallel, and returns any error. As a result we don't know which chunk failed,
// so we increment the metric for all tenants with chunks in the request. I think in practice we're only ever
// fetching chunks for a single tenant at a time anyway?
affectedUsers := map[string]struct{}{}
for _, chk := range chks {
affectedUsers[chk.UserID] = struct{}{}
}
for user := range affectedUsers {
c.metrics.chunkDecodeFailures.WithLabelValues(user).Inc()
}

return chks, err
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/chunk/client/object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,14 @@ func (o *client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex
}

if err := c.Decode(decodeContext, buf.Bytes()); err != nil {
return chunk.Chunk{}, errors.WithStack(fmt.Errorf("failed to decode chunk '%s': %w", key, err))
return chunk.Chunk{}, errors.WithStack(
fmt.Errorf(
"failed to decode chunk '%s' for tenant `%s`: %w",
key,
c.ChunkRef.UserID,
err,
),
)
}
return c, nil
}
Expand Down

0 comments on commit 5a87ccb

Please sign in to comment.