Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use WAL Manager #13491

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
"crypto/rand"
"fmt"
"net/http"
"strconv"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/storage/wal"
Expand Down Expand Up @@ -77,18 +78,16 @@
}

type flushOp struct {
from model.Time
userID string
fp model.Fingerprint
immediate bool
it *wal.PendingItem
num int64
}

func (o *flushOp) Key() string {
return fmt.Sprintf("%s-%s-%v", o.userID, o.fp, o.immediate)
return strconv.Itoa(int(o.num))
}

func (o *flushOp) Priority() int64 {
return -int64(o.from)
return -o.num
}

func (i *Ingester) flushLoop(j int) {
Expand All @@ -103,29 +102,35 @@
if o == nil {
return
}
op := o.(*flushCtx)
op := o.(*flushOp)

start := time.Now()

// We'll use this to log the size of the segment that was flushed.
n := humanize.Bytes(uint64(op.it.Writer.InputSize()))

err := i.flushOp(l, op)
d := time.Since(start)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "err", err)
level.Error(l).Log("msg", "failed to flush", "size", n, "duration", d, "err", err)
// Immediately re-queue another attempt at flushing this segment.
grobinson-grafana marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Add some backoff or something?
i.flushQueues[j].Enqueue(op)
} else {
// Close the channel and trigger all waiting listeners to return
// TODO: Figure out how to return an error if we want to?
close(op.flushDone)
level.Debug(l).Log("msg", "flushed", "size", n, "duration", d)
}

op.it.Result.SetDone(err)
i.wal.Put(op.it)

Check failure on line 123 in pkg/ingester-rf1/flush.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

Error return value of `i.wal.Put` is not checked (errcheck)
}
}

func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error {
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushSegment(ctx, flushCtx.segmentWriter)
err := i.flushSegment(ctx, op.it.Writer)
if err == nil {
break
}
Expand Down
95 changes: 25 additions & 70 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,6 @@ type Interface interface {
PrepareShutdown(w http.ResponseWriter, r *http.Request)
}

type flushCtx struct {
lock *sync.RWMutex
flushDone chan struct{}
newCtxAvailable chan struct{}
segmentWriter *wal.SegmentWriter
creationTime time.Time
}

func (o *flushCtx) Key() string {
return fmt.Sprintf("%d", o.creationTime.UnixNano())
}

func (o *flushCtx) Priority() int64 {
return -o.creationTime.UnixNano()
}

// Ingester builds chunks for incoming log streams.
type Ingester struct {
services.Service
Expand Down Expand Up @@ -217,10 +201,11 @@ type Ingester struct {

// One queue per flush thread. Fingerprint is used to
// pick a queue.
numOps int64
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup

flushCtx *flushCtx
wal *wal.Manager

limiter *Limiter

Expand Down Expand Up @@ -268,7 +253,11 @@ func New(cfg Config, clientConfig client.Config,
targetSizeStats.Set(int64(cfg.TargetChunkSize))
metrics := newIngesterMetrics(registerer, metricsNamespace)

segmentWriter, err := wal.NewWalSegmentWriter()
walManager, err := wal.NewManager(wal.Config{
MaxAge: wal.DefaultMaxAge,
MaxSegments: wal.DefaultMaxSegments,
MaxSegmentSize: wal.DefaultMaxSegmentSize,
})
if err != nil {
return nil, err
}
Expand All @@ -291,12 +280,7 @@ func New(cfg Config, clientConfig client.Config,
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"),
customStreamsTracker: customStreamsTracker,
readRing: readRing,
flushCtx: &flushCtx{
lock: &sync.RWMutex{},
flushDone: make(chan struct{}),
newCtxAvailable: make(chan struct{}),
segmentWriter: segmentWriter,
},
wal: walManager,
}

// TODO: change flush on shutdown
Expand Down Expand Up @@ -477,7 +461,6 @@ func (i *Ingester) running(ctx context.Context) error {
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs util.MultiError
// errs.Add(i.wal.Stop())

//if i.flushOnShutdownSwitch.Get() {
// i.lifecycler.SetFlushOnShutdown(true)
Expand Down Expand Up @@ -567,30 +550,18 @@ func (i *Ingester) loop() {
}

func (i *Ingester) doFlushTick() {
i.flushCtx.lock.Lock()

// i.logger.Log("msg", "starting periodic flush")
// Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used.
currentFlushCtx := i.flushCtx

// APIs become unblocked after resetting flushCtx
segmentWriter, err := wal.NewWalSegmentWriter()
if err != nil {
// TODO: handle this properly
panic(err)
}
i.flushCtx = &flushCtx{
lock: &sync.RWMutex{},
flushDone: make(chan struct{}),
newCtxAvailable: make(chan struct{}),
segmentWriter: segmentWriter,
}
close(currentFlushCtx.newCtxAvailable) // Broadcast to all waiters that they can now fetch a new flushCtx. Small chance of a race but if they re-fetch the old one, they'll just check again immediately.
// Flush the finished context in the background & then notify watching API requests
// TODO: use multiple flush queues if required
// Don't write empty segments if there is nothing to write.
if currentFlushCtx.segmentWriter.InputSize() > 0 {
i.flushQueues[0].Enqueue(currentFlushCtx)
for {
// Keep adding ops to the queue until there are no more.
it, _ := i.wal.NextPending()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could remove the err return from NextPending if we don't look at it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too, I'll do it for both NextPending and Put in another PR. 👍

if it == nil {
break
}
i.numOps++
flushQueueIndex := i.numOps % int64(i.cfg.ConcurrentFlushes)
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
num: i.numOps,
it: it,
})
}
}

Expand Down Expand Up @@ -796,27 +767,11 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
return &logproto.PushResponse{}, err
}

// Fetch a flush context and try to acquire the RLock
// The only time the Write Lock is held is when this context is no longer usable and a new one is being created.
// In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available.
// The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
currentFlushCtx := i.flushCtx
for !currentFlushCtx.lock.TryRLock() {
select {
case <-currentFlushCtx.newCtxAvailable:
case <-ctx.Done():
return &logproto.PushResponse{}, ctx.Err()
}
currentFlushCtx = i.flushCtx
}
err = instance.Push(ctx, req, currentFlushCtx)
currentFlushCtx.lock.RUnlock()
select {
case <-ctx.Done():
return &logproto.PushResponse{}, ctx.Err()
case <-currentFlushCtx.flushDone:
return &logproto.PushResponse{}, err
if err = instance.Push(ctx, i.wal, req); err != nil {
return nil, err
}

return &logproto.PushResponse{}, nil
}

// GetStreamRates returns a response containing all streams and their current rate
Expand Down Expand Up @@ -851,7 +806,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker, i.logger)
if err != nil {
return nil, err
}
Expand Down
32 changes: 25 additions & 7 deletions pkg/ingester-rf1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/validation"
Expand Down Expand Up @@ -70,6 +72,7 @@ type instance struct {
// tailers map[uint32]*tailer
tailerMtx sync.RWMutex

logger log.Logger
limiter *Limiter
streamCountLimiter *streamCountLimiter
ownedStreamsSvc *ownedStreamService
Expand All @@ -87,10 +90,10 @@ type instance struct {
customStreamsTracker push.UsageTracker
}

func (i *instance) Push(ctx context.Context, req *logproto.PushRequest, flushCtx *flushCtx) error {
func (i *instance) Push(ctx context.Context, w *wal.Manager, req *logproto.PushRequest) error {
rateLimitWholeStream := i.limiter.limits.ShardStreams(i.instanceID).Enabled

var appendErr error
results := make([]*wal.AppendResult, 0, len(req.Streams))
for _, reqStream := range req.Streams {
s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels,
func() (*stream, error) {
Expand All @@ -102,13 +105,27 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest, flushCtx
},
)
if err != nil {
appendErr = err
continue
return err
}
_, res, err := s.Push(ctx, w, reqStream.Entries, rateLimitWholeStream, i.customStreamsTracker)
if err != nil {
return err
}
results = append(results, res)
}

_, appendErr = s.Push(ctx, reqStream.Entries, rateLimitWholeStream, i.customStreamsTracker, flushCtx)
for _, result := range results {
select {
case <-ctx.Done():
return ctx.Err()
case <-result.Done():
if err := result.Err(); err != nil {
return err
}
}
}
return appendErr

return nil
}

func newInstance(
Expand All @@ -121,8 +138,8 @@ func newInstance(
streamRateCalculator *StreamRateCalculator,
writeFailures *writefailures.Manager,
customStreamsTracker push.UsageTracker,
logger log.Logger,
) (*instance, error) {
fmt.Println("new instance for", instanceID)
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
return nil, err
Expand All @@ -141,6 +158,7 @@ func newInstance(
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
//
//tailers: map[uint32]*tailer{},
logger: logger,
limiter: limiter,
streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc),
ownedStreamsSvc: ownedStreamsSvc,
Expand Down
29 changes: 21 additions & 8 deletions pkg/ingester-rf1/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/flagext"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -130,21 +131,24 @@ func (s *stream) consumeChunk(_ context.Context, _ *logproto.Chunk) error {

func (s *stream) Push(
ctx context.Context,
wal *wal.Manager,
entries []logproto.Entry,
// Whether nor not to ingest all at once or not. It is a per-tenant configuration.
rateLimitWholeStream bool,

usageTracker push.UsageTracker,
flushCtx *flushCtx,
) (int, error) {
) (int, *wal.AppendResult, error) {
toStore, invalid := s.validateEntries(ctx, entries, rateLimitWholeStream, usageTracker)
if rateLimitWholeStream && hasRateLimitErr(invalid) {
return 0, errorForFailedEntries(s, invalid, len(entries))
return 0, nil, errorForFailedEntries(s, invalid, len(entries))
}

bytesAdded := s.storeEntries(ctx, toStore, usageTracker, flushCtx)
bytesAdded, res, err := s.storeEntries(ctx, wal, toStore, usageTracker)
if err != nil {
return 0, nil, err
}

return bytesAdded, errorForFailedEntries(s, invalid, len(entries))
return bytesAdded, res, errorForFailedEntries(s, invalid, len(entries))
}

func errorForFailedEntries(s *stream, failedEntriesWithError []entryWithError, totalEntries int) error {
Expand Down Expand Up @@ -195,7 +199,7 @@ func hasRateLimitErr(errs []entryWithError) bool {
return ok
}

func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) int {
func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry, usageTracker push.UsageTracker) (int, *wal.AppendResult, error) {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("event", "stream started to store entries", "labels", s.labelsString)
defer sp.LogKV("event", "stream finished to store entries")
Expand All @@ -213,9 +217,18 @@ func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, us

bytesAdded += len(entries[i].Line)
}
flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, entries)

res, err := w.Append(wal.AppendRequest{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more performant to use a pointer for Append here - unsafe.Sizeof says the AppendRequest is 80B and since it's passed by value it gets copied into the function call.
I'm not sure if this actually leads to any performance benefit in this case but might be worth a benchmark since it's called so often?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of the time &wal.AppendRequest{} will be turned into wal.AppendRequest{} at compile-time as it's faster to do this copy on the stack instead of allocate it on the heap. It's easier to show with an isolated example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark for using the stack:

go test -gcflags=-N -bench=. bench_test.go
goos: darwin
goarch: arm64
BenchmarkDoRequest-8   	573065010	         2.098 ns/op
PASS
ok  	command-line-arguments	2.663s
type AppendRequest struct {
	TenantID  string
	Labels    map[string]string
	LabelsStr string
}

func doRequest(r AppendRequest) {
	_ = r
}

func BenchmarkDoRequest(t *testing.B) {
	labels := map[string]string{"foo": "bar"}
	labelsStr := "{foo=\"bar\"}"
	for i := 0; i < t.N; i++ {
		doRequest(AppendRequest{
			TenantID:  "1",
			Labels:    labels,
			LabelsStr: labelsStr,
		})
	}
}

Benchmark for using the heap:

go test -gcflags=-N -bench=. bench_test.go
goos: darwin
goarch: arm64
BenchmarkDoRequest-8   	346227085	         3.131 ns/op
PASS
ok  	command-line-arguments	1.654s
type AppendRequest struct {
	TenantID  string
	Labels    map[string]string
	LabelsStr string
}

func doRequest(r *AppendRequest) {
	_ = r
}

func BenchmarkDoRequest(t *testing.B) {
	labels := map[string]string{"foo": "bar"}
	labelsStr := "{foo=\"bar\"}"
	for i := 0; i < t.N; i++ {
		doRequest(&AppendRequest{
			TenantID:  "1",
			Labels:    labels,
			LabelsStr: labelsStr,
		})
	}
}

TenantID: s.tenant,
Labels: s.labels,
LabelsStr: s.labels.String(),
Entries: entries,
})
if err != nil {
return 0, nil, err
}
s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker)
return bytesAdded
return bytesAdded, res, nil
}

func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) {
Expand Down
Loading