Skip to content

Commit

Permalink
feat: Use WAL Manager (#13491)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jul 11, 2024
1 parent 08615bf commit 8f1d12f
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 102 deletions.
39 changes: 22 additions & 17 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"crypto/rand"
"fmt"
"net/http"
"strconv"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/storage/wal"
Expand Down Expand Up @@ -77,18 +78,16 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
}

type flushOp struct {
from model.Time
userID string
fp model.Fingerprint
immediate bool
it *wal.PendingItem
num int64
}

func (o *flushOp) Key() string {
return fmt.Sprintf("%s-%s-%v", o.userID, o.fp, o.immediate)
return strconv.Itoa(int(o.num))
}

func (o *flushOp) Priority() int64 {
return -int64(o.from)
return -o.num
}

func (i *Ingester) flushLoop(j int) {
Expand All @@ -103,29 +102,35 @@ func (i *Ingester) flushLoop(j int) {
if o == nil {
return
}
op := o.(*flushCtx)
op := o.(*flushOp)

start := time.Now()

// We'll use this to log the size of the segment that was flushed.
n := humanize.Bytes(uint64(op.it.Writer.InputSize()))

err := i.flushOp(l, op)
d := time.Since(start)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "err", err)
// Immediately re-queue another attempt at flushing this segment.
// TODO: Add some backoff or something?
i.flushQueues[j].Enqueue(op)
level.Error(l).Log("msg", "failed to flush", "size", n, "duration", d, "err", err)
} else {
// Close the channel and trigger all waiting listeners to return
// TODO: Figure out how to return an error if we want to?
close(op.flushDone)
level.Debug(l).Log("msg", "flushed", "size", n, "duration", d)
}

op.it.Result.SetDone(err)
if err = i.wal.Put(op.it); err != nil {
level.Error(l).Log("msg", "failed to put back in WAL Manager", "err", err)
}
}
}

func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error {
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushSegment(ctx, flushCtx.segmentWriter)
err := i.flushSegment(ctx, op.it.Writer)
if err == nil {
break
}
Expand Down
95 changes: 25 additions & 70 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,6 @@ type Interface interface {
PrepareShutdown(w http.ResponseWriter, r *http.Request)
}

type flushCtx struct {
lock *sync.RWMutex
flushDone chan struct{}
newCtxAvailable chan struct{}
segmentWriter *wal.SegmentWriter
creationTime time.Time
}

func (o *flushCtx) Key() string {
return fmt.Sprintf("%d", o.creationTime.UnixNano())
}

func (o *flushCtx) Priority() int64 {
return -o.creationTime.UnixNano()
}

// Ingester builds chunks for incoming log streams.
type Ingester struct {
services.Service
Expand Down Expand Up @@ -217,10 +201,11 @@ type Ingester struct {

// One queue per flush thread. Fingerprint is used to
// pick a queue.
numOps int64
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup

flushCtx *flushCtx
wal *wal.Manager

limiter *Limiter

Expand Down Expand Up @@ -268,7 +253,11 @@ func New(cfg Config, clientConfig client.Config,
targetSizeStats.Set(int64(cfg.TargetChunkSize))
metrics := newIngesterMetrics(registerer, metricsNamespace)

segmentWriter, err := wal.NewWalSegmentWriter()
walManager, err := wal.NewManager(wal.Config{
MaxAge: wal.DefaultMaxAge,
MaxSegments: wal.DefaultMaxSegments,
MaxSegmentSize: wal.DefaultMaxSegmentSize,
})
if err != nil {
return nil, err
}
Expand All @@ -291,12 +280,7 @@ func New(cfg Config, clientConfig client.Config,
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"),
customStreamsTracker: customStreamsTracker,
readRing: readRing,
flushCtx: &flushCtx{
lock: &sync.RWMutex{},
flushDone: make(chan struct{}),
newCtxAvailable: make(chan struct{}),
segmentWriter: segmentWriter,
},
wal: walManager,
}

// TODO: change flush on shutdown
Expand Down Expand Up @@ -477,7 +461,6 @@ func (i *Ingester) running(ctx context.Context) error {
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs util.MultiError
// errs.Add(i.wal.Stop())

//if i.flushOnShutdownSwitch.Get() {
// i.lifecycler.SetFlushOnShutdown(true)
Expand Down Expand Up @@ -567,30 +550,18 @@ func (i *Ingester) loop() {
}

func (i *Ingester) doFlushTick() {
i.flushCtx.lock.Lock()

// i.logger.Log("msg", "starting periodic flush")
// Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used.
currentFlushCtx := i.flushCtx

// APIs become unblocked after resetting flushCtx
segmentWriter, err := wal.NewWalSegmentWriter()
if err != nil {
// TODO: handle this properly
panic(err)
}
i.flushCtx = &flushCtx{
lock: &sync.RWMutex{},
flushDone: make(chan struct{}),
newCtxAvailable: make(chan struct{}),
segmentWriter: segmentWriter,
}
close(currentFlushCtx.newCtxAvailable) // Broadcast to all waiters that they can now fetch a new flushCtx. Small chance of a race but if they re-fetch the old one, they'll just check again immediately.
// Flush the finished context in the background & then notify watching API requests
// TODO: use multiple flush queues if required
// Don't write empty segments if there is nothing to write.
if currentFlushCtx.segmentWriter.InputSize() > 0 {
i.flushQueues[0].Enqueue(currentFlushCtx)
for {
// Keep adding ops to the queue until there are no more.
it, _ := i.wal.NextPending()
if it == nil {
break
}
i.numOps++
flushQueueIndex := i.numOps % int64(i.cfg.ConcurrentFlushes)
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
num: i.numOps,
it: it,
})
}
}

Expand Down Expand Up @@ -796,27 +767,11 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
return &logproto.PushResponse{}, err
}

// Fetch a flush context and try to acquire the RLock
// The only time the Write Lock is held is when this context is no longer usable and a new one is being created.
// In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available.
// The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
currentFlushCtx := i.flushCtx
for !currentFlushCtx.lock.TryRLock() {
select {
case <-currentFlushCtx.newCtxAvailable:
case <-ctx.Done():
return &logproto.PushResponse{}, ctx.Err()
}
currentFlushCtx = i.flushCtx
}
err = instance.Push(ctx, req, currentFlushCtx)
currentFlushCtx.lock.RUnlock()
select {
case <-ctx.Done():
return &logproto.PushResponse{}, ctx.Err()
case <-currentFlushCtx.flushDone:
return &logproto.PushResponse{}, err
if err = instance.Push(ctx, i.wal, req); err != nil {
return nil, err
}

return &logproto.PushResponse{}, nil
}

// GetStreamRates returns a response containing all streams and their current rate
Expand Down Expand Up @@ -851,7 +806,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker, i.logger)
if err != nil {
return nil, err
}
Expand Down
32 changes: 25 additions & 7 deletions pkg/ingester-rf1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/validation"
Expand Down Expand Up @@ -70,6 +72,7 @@ type instance struct {
// tailers map[uint32]*tailer
tailerMtx sync.RWMutex

logger log.Logger
limiter *Limiter
streamCountLimiter *streamCountLimiter
ownedStreamsSvc *ownedStreamService
Expand All @@ -87,10 +90,10 @@ type instance struct {
customStreamsTracker push.UsageTracker
}

func (i *instance) Push(ctx context.Context, req *logproto.PushRequest, flushCtx *flushCtx) error {
func (i *instance) Push(ctx context.Context, w *wal.Manager, req *logproto.PushRequest) error {
rateLimitWholeStream := i.limiter.limits.ShardStreams(i.instanceID).Enabled

var appendErr error
results := make([]*wal.AppendResult, 0, len(req.Streams))
for _, reqStream := range req.Streams {
s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels,
func() (*stream, error) {
Expand All @@ -102,13 +105,27 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest, flushCtx
},
)
if err != nil {
appendErr = err
continue
return err
}
_, res, err := s.Push(ctx, w, reqStream.Entries, rateLimitWholeStream, i.customStreamsTracker)
if err != nil {
return err
}
results = append(results, res)
}

_, appendErr = s.Push(ctx, reqStream.Entries, rateLimitWholeStream, i.customStreamsTracker, flushCtx)
for _, result := range results {
select {
case <-ctx.Done():
return ctx.Err()
case <-result.Done():
if err := result.Err(); err != nil {
return err
}
}
}
return appendErr

return nil
}

func newInstance(
Expand All @@ -121,8 +138,8 @@ func newInstance(
streamRateCalculator *StreamRateCalculator,
writeFailures *writefailures.Manager,
customStreamsTracker push.UsageTracker,
logger log.Logger,
) (*instance, error) {
fmt.Println("new instance for", instanceID)
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
return nil, err
Expand All @@ -141,6 +158,7 @@ func newInstance(
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
//
//tailers: map[uint32]*tailer{},
logger: logger,
limiter: limiter,
streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc),
ownedStreamsSvc: ownedStreamsSvc,
Expand Down
29 changes: 21 additions & 8 deletions pkg/ingester-rf1/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/flagext"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -130,21 +131,24 @@ func (s *stream) consumeChunk(_ context.Context, _ *logproto.Chunk) error {

func (s *stream) Push(
ctx context.Context,
wal *wal.Manager,
entries []logproto.Entry,
// Whether nor not to ingest all at once or not. It is a per-tenant configuration.
rateLimitWholeStream bool,

usageTracker push.UsageTracker,
flushCtx *flushCtx,
) (int, error) {
) (int, *wal.AppendResult, error) {
toStore, invalid := s.validateEntries(ctx, entries, rateLimitWholeStream, usageTracker)
if rateLimitWholeStream && hasRateLimitErr(invalid) {
return 0, errorForFailedEntries(s, invalid, len(entries))
return 0, nil, errorForFailedEntries(s, invalid, len(entries))
}

bytesAdded := s.storeEntries(ctx, toStore, usageTracker, flushCtx)
bytesAdded, res, err := s.storeEntries(ctx, wal, toStore, usageTracker)
if err != nil {
return 0, nil, err
}

return bytesAdded, errorForFailedEntries(s, invalid, len(entries))
return bytesAdded, res, errorForFailedEntries(s, invalid, len(entries))
}

func errorForFailedEntries(s *stream, failedEntriesWithError []entryWithError, totalEntries int) error {
Expand Down Expand Up @@ -195,7 +199,7 @@ func hasRateLimitErr(errs []entryWithError) bool {
return ok
}

func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) int {
func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry, usageTracker push.UsageTracker) (int, *wal.AppendResult, error) {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("event", "stream started to store entries", "labels", s.labelsString)
defer sp.LogKV("event", "stream finished to store entries")
Expand All @@ -213,9 +217,18 @@ func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, us

bytesAdded += len(entries[i].Line)
}
flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, entries)

res, err := w.Append(wal.AppendRequest{
TenantID: s.tenant,
Labels: s.labels,
LabelsStr: s.labels.String(),
Entries: entries,
})
if err != nil {
return 0, nil, err
}
s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker)
return bytesAdded
return bytesAdded, res, nil
}

func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) {
Expand Down

0 comments on commit 8f1d12f

Please sign in to comment.