Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce wal segment read path. #13695

Merged
merged 19 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
}

id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
if err := i.store.PutObject(ctx, fmt.Sprintf(wal.Dir+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("failed to put object: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/raftleader"
)

const metastoreRaftLeaderHealthServiceName = "metastorepb.MetastoreService.RaftLeader"
const metastoreRaftLeaderHealthServiceName = "metastorepb.MetastoreService"
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved

type Config struct {
DataDir string `yaml:"data_dir"`
Expand Down
228 changes: 228 additions & 0 deletions pkg/querier-rf1/wal/chunks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package wal

import (
"bytes"
"context"
"sort"
"sync"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/storage/wal/chunks"
"github.com/grafana/loki/v3/pkg/storage/wal/index"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"
)

const batchSize = 16

var _ iter.EntryIterator = (*lazyChunks)(nil)

type lazyChunk struct {
meta *chunks.Meta
labels labels.Labels
id string
}

func newLazyChunk(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) lazyChunk {
lbs.Sort()
return lazyChunk{
id: id,
meta: meta,
labels: lbs.Labels(),
}
}

type lazyChunks struct {
chunks []lazyChunk
direction logproto.Direction
pipeline log.Pipeline
minT, maxT int64
storage BlockStorage
ctx context.Context

current iter.EntryIterator
batch []lazyChunk
err error
}

// todo: Support SampleIterator.
func NewChunksEntryIterator(
ctx context.Context,
storage BlockStorage,
chunks []lazyChunk,
pipeline log.Pipeline,
direction logproto.Direction,
minT, maxT int64,
) *lazyChunks {

Check warning on line 60 in pkg/querier-rf1/wal/chunks.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unexported-return: exported func NewChunksEntryIterator returns unexported type *wal.lazyChunks, which can be annoying to use (revive)
// sort by time and then by labels following the direction.
sort.Slice(chunks, func(i, j int) bool {
if direction == logproto.FORWARD {
t1, t2 := chunks[i].meta.MinTime, chunks[j].meta.MinTime
if t1 != t2 {
return t1 < t2
}
return labels.Compare(chunks[i].labels, chunks[j].labels) < 0
}
t1, t2 := chunks[i].meta.MaxTime, chunks[j].meta.MaxTime
if t1 != t2 {
return t1 > t2
}
return labels.Compare(chunks[i].labels, chunks[j].labels) < 0
})
return &lazyChunks{
ctx: ctx,
chunks: chunks,
direction: direction,
pipeline: pipeline,
storage: storage,
batch: make([]lazyChunk, 0, batchSize),
minT: minT,
maxT: maxT,
}
}

// At implements iter.EntryIterator.
func (l *lazyChunks) At() push.Entry {
if l.current == nil {
return push.Entry{}
}
return l.current.At()
}

func (l *lazyChunks) Labels() string {
if l.current == nil {
return ""
}
return l.current.Labels()
}

func (l *lazyChunks) StreamHash() uint64 {
if l.current == nil {
return 0
}
return l.current.StreamHash()
}

// Close implements iter.EntryIterator.
func (l *lazyChunks) Close() error {
if l.current == nil {
return nil
}
return l.current.Close()
}

// Err implements iter.EntryIterator.
func (l *lazyChunks) Err() error {
if l.err != nil {
return l.err
}
if l.current == nil {
return l.current.Err()
}
return nil
}

// Next implements iter.EntryIterator.
func (l *lazyChunks) Next() bool {
if l.current != nil && l.current.Next() {
return true
}
if l.current != nil {
if err := l.current.Close(); err != nil {
l.err = err
return false
}
}
if len(l.chunks) == 0 {
return false
}
// take the next batch of chunks
if err := l.nextBatch(); err != nil {
l.err = err
return false
}
return l.current.Next()
}

func (l *lazyChunks) nextBatch() error {
l.batch = l.batch[:0]
for len(l.chunks) > 0 &&
(len(l.batch) < batchSize ||
isOverlapping(l.batch[len(l.batch)-1], l.chunks[0], l.direction)) {
l.batch = append(l.batch, l.chunks[0])
l.chunks = l.chunks[1:]
}
// todo: error if the batch is too big.
// todo: reuse previous sortIterator array
// todo: Try to use iter.NonOverlappingEntryIterator if possible which can reduce the amount of work.
var (
iters []iter.EntryIterator
mtx sync.Mutex
)
g, ctx := errgroup.WithContext(l.ctx)
g.SetLimit(64)
for _, c := range l.batch {
c := c
g.Go(func() error {
iter, err := fetchChunkEntries(ctx, c, l.minT, l.maxT, l.direction, l.pipeline, l.storage)
if err != nil {
return err
}
mtx.Lock()
iters = append(iters, iter)
mtx.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
l.current = iter.NewSortEntryIterator(iters, l.direction)
return nil
}

func fetchChunkEntries(
ctx context.Context,
c lazyChunk,
from, through int64,
direction logproto.Direction,
pipeline log.Pipeline,
storage BlockStorage,
) (iter.EntryIterator, error) {
offset, size := c.meta.Ref.Unpack()
reader, err := storage.GetRangeObject(ctx, wal.Dir+c.id, int64(offset), int64(size))
if err != nil {
return nil, err
}
defer reader.Close()
// todo: use a pool
buf := bytes.NewBuffer(make([]byte, 0, size))
_, err = buf.ReadFrom(reader)
if err != nil {
return nil, err
}
// create logql pipeline and remove tenantID
// todo: we might want to create a single pipeline for all chunks from the same series.
streamPipeline := pipeline.ForStream(
labels.NewBuilder(c.labels).Del(index.TenantLabel).Labels(),
)
it, err := chunks.NewEntryIterator(buf.Bytes(), streamPipeline, direction, from, through)
if err != nil {
return nil, err
}
return iter.EntryIteratorWithClose(it, func() error {
// todo: return buffer to pool.
return nil
}), nil
}

func isOverlapping(first, second lazyChunk, direction logproto.Direction) bool {
if direction == logproto.BACKWARD {
return first.meta.MinTime <= second.meta.MaxTime
}
return first.meta.MaxTime < second.meta.MinTime
}
149 changes: 149 additions & 0 deletions pkg/querier-rf1/wal/querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package wal

import (
"bytes"
"context"
"io"
"sync"

"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"
grpc "google.golang.org/grpc"

"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/storage/wal/chunks"
"github.com/grafana/loki/v3/pkg/storage/wal/index"
)

var _ logql.Querier = (*Querier)(nil)

type BlockStorage interface {
GetRangeObject(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error)
}

type Metastore interface {
ListBlocksForQuery(ctx context.Context, in *metastorepb.ListBlocksForQueryRequest, opts ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error)
}

type Querier struct {
blockStorage BlockStorage
metaStore Metastore
}

func New(
metaStore Metastore,
blockStorage BlockStorage,
) (*Querier, error) {
return &Querier{
blockStorage: blockStorage,
metaStore: metaStore,
}, nil
}

func (q *Querier) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
// todo request validation and delete markers.
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
expr, err := req.LogSelector()
if err != nil {
return nil, err
}
matchers := expr.Matchers()
// todo: not sure if Pipeline is thread safe
pipeline, err := expr.Pipeline()
if err != nil {
return nil, err
}
// todo support sharding
var (
lazyChunks []lazyChunk
mtx sync.Mutex
)

err = q.forSeries(ctx, &metastorepb.ListBlocksForQueryRequest{
TenantId: tenantID,
StartTime: req.Start.UnixNano(),
EndTime: req.End.UnixNano(),
}, func(id string, lbs *labels.ScratchBuilder, chk *chunks.Meta) error {
mtx.Lock()
lazyChunks = append(lazyChunks, newLazyChunk(id, lbs, chk))
mtx.Unlock()
return nil
}, matchers...)

return NewChunksEntryIterator(ctx,
q.blockStorage,
lazyChunks,
pipeline,
req.Direction,
req.Start.UnixNano(),
req.End.UnixNano()), err
}

func (q *Querier) SelectSamples(context.Context, logql.SelectSampleParams) (iter.SampleIterator, error) {
// todo: implement
return nil, nil
}

func (q *Querier) forSeries(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(string, *labels.ScratchBuilder, *chunks.Meta) error, ms ...*labels.Matcher) error {
return q.forIndices(ctx, req, func(ir *index.Reader, id string) error {
bufLbls := labels.ScratchBuilder{}
chunks := make([]chunks.Meta, 0, 1)
p, err := ir.PostingsForMatchers(ctx, req.TenantId, ms...)
if err != nil {
return err
}
for p.Next() {
err := ir.Series(p.At(), &bufLbls, &chunks)
if err != nil {
return err
}
if err := fn(id, &bufLbls, &chunks[0]); err != nil {
return err
}
}
return p.Err()
})
}

func (q *Querier) forIndices(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(ir *index.Reader, id string) error) error {
resp, err := q.metaStore.ListBlocksForQuery(ctx, req)
if err != nil {
return err
}
metas := resp.Blocks
if len(metas) == 0 {
return nil
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(32)
for _, meta := range metas {

meta := meta
g.Go(func() error {
reader, err := q.blockStorage.GetRangeObject(ctx, wal.Dir+meta.Id, meta.IndexRef.Offset, meta.IndexRef.Length)
if err != nil {
return err
}
defer reader.Close()
// todo: use a buffer pool
buf := bytes.NewBuffer(make([]byte, 0, meta.IndexRef.Length))
_, err = buf.ReadFrom(reader)
if err != nil {
return err
}
index, err := index.NewReader(index.RealByteSlice(buf.Bytes()))
if err != nil {
return err
}
return fn(index, meta.Id)
})
}
return g.Wait()
}
Loading
Loading