diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index 55601337d350f..2d194a12f5574 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -114,7 +114,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter wal.ReportSegmentStats(stats, i.metrics.segmentMetrics) id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String() - if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil { + if err := i.store.PutObject(ctx, fmt.Sprintf(wal.Dir+id), buf); err != nil { i.metrics.flushFailuresTotal.Inc() return fmt.Errorf("failed to put object: %w", err) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1ede7ee806b79..d823f5cedb5cd 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -52,6 +52,7 @@ import ( metastoreclient "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/client" "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/health" "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" + "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -415,7 +416,11 @@ func (t *Loki) initQuerier() (services.Service, error) { if t.Cfg.QuerierRF1.Enabled { logger.Log("Using RF-1 querier implementation") - t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, logger) + store, err := objstore.New(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics) + if err != nil { + return nil, err + } + t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, t.MetastoreClient, store, logger) if err != nil { return nil, err } @@ -1818,7 +1823,7 @@ func (t *Loki) initMetastore() (services.Service, error) { return nil, nil } if t.Cfg.isTarget(All) { - t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%s", t.Cfg.Server.GRPCListenAddress) + t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort) } m, err := metastore.New(t.Cfg.Metastore, log.With(util_log.Logger, "component", "metastore"), prometheus.DefaultRegisterer, t.health) if err != nil { diff --git a/pkg/querier-rf1/querier.go b/pkg/querier-rf1/querier.go index 9504fe23482ab..c4a9dd76ba5f6 100644 --- a/pkg/querier-rf1/querier.go +++ b/pkg/querier-rf1/querier.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier" + "github.com/grafana/loki/v3/pkg/querier-rf1/wal" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage" @@ -97,6 +98,7 @@ type Rf1Querier struct { deleteGetter deleteGetter logger log.Logger patternQuerier PatterQuerier + walQuerier logql.Querier } type deleteGetter interface { @@ -104,12 +106,17 @@ type deleteGetter interface { } // New makes a new Querier for RF1 work. -func New(cfg Config, store Store, limits Limits, d deleteGetter, logger log.Logger) (*Rf1Querier, error) { +func New(cfg Config, store Store, limits Limits, d deleteGetter, metastore wal.Metastore, b wal.BlockStorage, logger log.Logger) (*Rf1Querier, error) { + querier, err := wal.New(metastore, b) + if err != nil { + return nil, err + } return &Rf1Querier{ cfg: cfg, store: store, limits: limits, deleteGetter: d, + walQuerier: querier, logger: logger, }, nil } @@ -134,7 +141,7 @@ func (q *Rf1Querier) SelectLogs(ctx context.Context, params logql.SelectLogParam "msg", "querying rf1 store", "params", params) } - storeIter, err := q.store.SelectLogs(ctx, params) + storeIter, err := q.walQuerier.SelectLogs(ctx, params) if err != nil { return nil, err } @@ -164,7 +171,7 @@ func (q *Rf1Querier) SelectSamples(ctx context.Context, params logql.SelectSampl "msg", "querying rf1 store for samples", "params", params) } - storeIter, err := q.store.SelectSamples(ctx, params) + storeIter, err := q.walQuerier.SelectSamples(ctx, params) if err != nil { return nil, err } diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go new file mode 100644 index 0000000000000..76070006aff40 --- /dev/null +++ b/pkg/querier-rf1/wal/chunks.go @@ -0,0 +1,323 @@ +package wal + +import ( + "context" + "fmt" + "sort" + + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/errgroup" + + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/storage/wal" + "github.com/grafana/loki/v3/pkg/storage/wal/chunks" + "github.com/grafana/loki/v3/pkg/storage/wal/index" + + "github.com/grafana/loki/pkg/push" +) + +const defaultBatchSize = 16 + +type ChunkData struct { + meta *chunks.Meta + labels labels.Labels + id string +} + +func newChunkData(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) ChunkData { + lbs.Sort() + newLbs := lbs.Labels() + j := 0 + for _, l := range newLbs { + if l.Name != index.TenantLabel { + newLbs[j] = l + j++ + } + } + newLbs = newLbs[:j] + return ChunkData{ + id: id, + meta: &chunks.Meta{ // incoming Meta is from a shared buffer, so create a new one + Ref: meta.Ref, + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + }, + labels: newLbs, + } +} + +// ChunksEntryIterator iterates over log entries +type ChunksEntryIterator[T iter.EntryIterator] struct { + baseChunksIterator[T] +} + +// ChunksSampleIterator iterates over metric samples +type ChunksSampleIterator[T iter.SampleIterator] struct { + baseChunksIterator[T] +} + +func NewChunksEntryIterator( + ctx context.Context, + storage BlockStorage, + chunks []ChunkData, + pipeline log.Pipeline, + direction logproto.Direction, + minT, maxT int64, +) *ChunksEntryIterator[iter.EntryIterator] { + sortChunks(chunks, direction) + return &ChunksEntryIterator[iter.EntryIterator]{ + baseChunksIterator: baseChunksIterator[iter.EntryIterator]{ + ctx: ctx, + chunks: chunks, + direction: direction, + storage: storage, + bachSize: defaultBatchSize, + batch: make([]ChunkData, 0, defaultBatchSize), + minT: minT, + maxT: maxT, + + iteratorFactory: func(chunks []ChunkData) (iter.EntryIterator, error) { + return createNextEntryIterator(ctx, chunks, direction, pipeline, storage, minT, maxT) + }, + isNil: func(it iter.EntryIterator) bool { return it == nil }, + }, + } +} + +func NewChunksSampleIterator( + ctx context.Context, + storage BlockStorage, + chunks []ChunkData, + extractor log.SampleExtractor, + minT, maxT int64, +) *ChunksSampleIterator[iter.SampleIterator] { + sortChunks(chunks, logproto.FORWARD) + return &ChunksSampleIterator[iter.SampleIterator]{ + baseChunksIterator: baseChunksIterator[iter.SampleIterator]{ + ctx: ctx, + chunks: chunks, + direction: logproto.FORWARD, + storage: storage, + bachSize: defaultBatchSize, + batch: make([]ChunkData, 0, defaultBatchSize), + minT: minT, + maxT: maxT, + + iteratorFactory: func(chunks []ChunkData) (iter.SampleIterator, error) { + return createNextSampleIterator(ctx, chunks, extractor, storage, minT, maxT) + }, + isNil: func(it iter.SampleIterator) bool { return it == nil }, + }, + } +} + +func sortChunks(chunks []ChunkData, direction logproto.Direction) { + sort.Slice(chunks, func(i, j int) bool { + if direction == logproto.FORWARD { + t1, t2 := chunks[i].meta.MinTime, chunks[j].meta.MinTime + if t1 != t2 { + return t1 < t2 + } + return labels.Compare(chunks[i].labels, chunks[j].labels) < 0 + } + t1, t2 := chunks[i].meta.MaxTime, chunks[j].meta.MaxTime + if t1 != t2 { + return t1 > t2 + } + return labels.Compare(chunks[i].labels, chunks[j].labels) < 0 + }) +} + +// baseChunksIterator contains common fields and methods for both entry and sample iterators +type baseChunksIterator[T interface { + Next() bool + Close() error + Err() error + StreamHash() uint64 + Labels() string +}] struct { + chunks []ChunkData + direction logproto.Direction + minT, maxT int64 + storage BlockStorage + ctx context.Context + iteratorFactory func([]ChunkData) (T, error) + isNil func(T) bool + + bachSize int + batch []ChunkData + current T + err error +} + +func (b *baseChunksIterator[T]) nextBatch() error { + b.batch = b.batch[:0] + for len(b.chunks) > 0 && + (len(b.batch) < b.bachSize || + isOverlapping(b.batch[len(b.batch)-1], b.chunks[0], b.direction)) { + b.batch = append(b.batch, b.chunks[0]) + b.chunks = b.chunks[1:] + } + // todo: error if the batch is too big. + return nil +} + +// todo: better chunk batch iterator +func (b *baseChunksIterator[T]) Next() bool { + for b.isNil(b.current) || !b.current.Next() { + if !b.isNil(b.current) { + if err := b.current.Close(); err != nil { + b.err = err + return false + } + } + if len(b.chunks) == 0 { + return false + } + if err := b.nextBatch(); err != nil { + b.err = err + return false + } + var err error + b.current, err = b.iteratorFactory(b.batch) + if err != nil { + b.err = err + return false + } + } + return true +} + +func createNextEntryIterator( + ctx context.Context, + batch []ChunkData, + direction logproto.Direction, + pipeline log.Pipeline, + storage BlockStorage, + minT, maxT int64, +) (iter.EntryIterator, error) { + iterators := make([]iter.EntryIterator, 0, len(batch)) + + data, err := downloadChunks(ctx, storage, batch) + if err != nil { + return nil, err + } + + for i, chunk := range batch { + streamPipeline := pipeline.ForStream(chunk.labels) + chunkIterator, err := chunks.NewEntryIterator(data[i], streamPipeline, direction, minT, maxT) + if err != nil { + return nil, fmt.Errorf("error creating entry iterator: %w", err) + } + iterators = append(iterators, chunkIterator) + } + + // todo: Use NonOverlapping iterator when possible. This will reduce the amount of entries processed during iteration. + return iter.NewSortEntryIterator(iterators, direction), nil +} + +func createNextSampleIterator( + ctx context.Context, + batch []ChunkData, + pipeline log.SampleExtractor, + storage BlockStorage, + minT, maxT int64, +) (iter.SampleIterator, error) { + iterators := make([]iter.SampleIterator, 0, len(batch)) + + data, err := downloadChunks(ctx, storage, batch) + if err != nil { + return nil, err + } + + for i, chunk := range batch { + streamPipeline := pipeline.ForStream(chunk.labels) + chunkIterator, err := chunks.NewSampleIterator(data[i], streamPipeline, minT, maxT) + if err != nil { + return nil, fmt.Errorf("error creating sample iterator: %w", err) + } + iterators = append(iterators, chunkIterator) + } + + return iter.NewSortSampleIterator(iterators), nil +} + +func (b *baseChunksIterator[T]) Close() error { + if !b.isNil(b.current) { + return b.current.Close() + } + return nil +} + +func (b *baseChunksIterator[T]) Err() error { + if b.err != nil { + return b.err + } + if !b.isNil(b.current) { + return b.current.Err() + } + return nil +} + +func (b *baseChunksIterator[T]) Labels() string { + return b.current.Labels() +} + +func (b *baseChunksIterator[T]) StreamHash() uint64 { + return b.current.StreamHash() +} + +func (c *ChunksEntryIterator[T]) At() push.Entry { return c.current.At() } +func (c *ChunksSampleIterator[T]) At() logproto.Sample { return c.current.At() } + +func isOverlapping(first, second ChunkData, direction logproto.Direction) bool { + if direction == logproto.BACKWARD { + return first.meta.MinTime <= second.meta.MaxTime + } + return first.meta.MaxTime >= second.meta.MinTime +} + +func downloadChunks(ctx context.Context, storage BlockStorage, chks []ChunkData) ([][]byte, error) { + data := make([][]byte, len(chks)) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(64) + for i, chunk := range chks { + chunk := chunk + i := i + g.Go(func() error { + chunkData, err := readChunkData(ctx, storage, chunk) + if err != nil { + return fmt.Errorf("error reading chunk data: %w", err) + } + data[i] = chunkData + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + return data, nil +} + +func readChunkData(ctx context.Context, storage BlockStorage, chunk ChunkData) ([]byte, error) { + offset, size := chunk.meta.Ref.Unpack() + // todo: We should be able to avoid many IOPS to object storage + // if chunks are next to each other and we should be able to pack range request + // together. + reader, err := storage.GetObjectRange(ctx, wal.Dir+chunk.id, int64(offset), int64(size)) + if err != nil { + return nil, err + } + defer reader.Close() + + data := make([]byte, size) + _, err = reader.Read(data) + if err != nil { + return nil, err + } + + return data, nil +} diff --git a/pkg/querier-rf1/wal/chunks_test.go b/pkg/querier-rf1/wal/chunks_test.go new file mode 100644 index 0000000000000..0d0192c04b17a --- /dev/null +++ b/pkg/querier-rf1/wal/chunks_test.go @@ -0,0 +1,516 @@ +package wal + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/storage/wal" + walchunks "github.com/grafana/loki/v3/pkg/storage/wal/chunks" +) + +type mockBlockStorage struct { + data map[string][]byte +} + +func (m *mockBlockStorage) GetObjectRange(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { + data := m.data[objectKey] + return io.NopCloser(bytes.NewReader(data[off : off+length])), nil +} + +func TestChunksEntryIterator(t *testing.T) { + ctx := context.Background() + storage := &mockBlockStorage{data: make(map[string][]byte)} + + // Generate test data with multiple batches + chunkData := generateTestChunkData(5 * defaultBatchSize) + chks := writeChunksToStorage(t, storage, chunkData) + + tests := []struct { + name string + direction logproto.Direction + start time.Time + end time.Time + expected []logproto.Entry + }{ + { + name: "forward direction, all entries", + direction: logproto.FORWARD, + start: time.Unix(0, 0), + end: time.Unix(int64(5*defaultBatchSize+1), 0), + expected: flattenEntries(chunkData), + }, + { + name: "backward direction, all entries", + direction: logproto.BACKWARD, + start: time.Unix(0, 0), + end: time.Unix(int64(5*defaultBatchSize+1), 0), + expected: reverseEntries(flattenEntries(chunkData)), + }, + { + name: "forward direction, partial range", + direction: logproto.FORWARD, + start: time.Unix(int64(defaultBatchSize), 0), + end: time.Unix(int64(3*defaultBatchSize), 0), + expected: selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize), + }, + { + name: "backward direction, partial range", + direction: logproto.BACKWARD, + start: time.Unix(int64(defaultBatchSize), 0), + end: time.Unix(int64(3*defaultBatchSize), 0), + expected: reverseEntries(selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expr, err := syntax.ParseLogSelector(`{app=~".+"}`, false) + require.NoError(t, err) + + pipeline, err := expr.Pipeline() + require.NoError(t, err) + + iterator := NewChunksEntryIterator(ctx, storage, chks, pipeline, tt.direction, tt.start.UnixNano(), tt.end.UnixNano()) + + result := iterateEntries(iterator) + require.NoError(t, iterator.Close()) + require.NoError(t, iterator.Err()) + + assertEqualEntries(t, tt.expected, result) + }) + } +} + +func TestChunksSampleIterator(t *testing.T) { + ctx := context.Background() + storage := &mockBlockStorage{data: make(map[string][]byte)} + + // Generate test data with multiple batches + chunkData := generateTestChunkData(5 * defaultBatchSize) + chks := writeChunksToStorage(t, storage, chunkData) + + tests := []struct { + name string + start time.Time + end time.Time + expected []logproto.Sample + }{ + { + name: "all samples", + start: time.Unix(0, 0), + end: time.Unix(int64(5*defaultBatchSize+1), 0), + expected: entriesToSamples(flattenEntries(chunkData)), + }, + { + name: "partial range", + start: time.Unix(int64(defaultBatchSize), 0), + end: time.Unix(int64(3*defaultBatchSize), 0), + expected: entriesToSamples(selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expr, err := syntax.ParseSampleExpr(`count_over_time({app=~".+"} [1m])`) + require.NoError(t, err) + + extractor, err := expr.Extractor() + require.NoError(t, err) + iterator := NewChunksSampleIterator(ctx, storage, chks, extractor, tt.start.UnixNano(), tt.end.UnixNano()) + + result := iterateSamples(iterator) + require.NoError(t, iterator.Close()) + require.NoError(t, iterator.Err()) + + assertEqualSamples(t, tt.expected, result) + }) + } +} + +func TestSortChunks(t *testing.T) { + chks := []ChunkData{ + { + meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}, + labels: labels.FromStrings("app", "test1"), + }, + { + meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}, + labels: labels.FromStrings("app", "test2"), + }, + { + meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}, + labels: labels.FromStrings("app", "test1"), + }, + } + + t.Run("forward direction", func(t *testing.T) { + sortChunks(chks, logproto.FORWARD) + require.Equal(t, int64(1), chks[0].meta.MinTime) + require.Equal(t, "test1", chks[0].labels.Get("app")) + require.Equal(t, int64(1), chks[1].meta.MinTime) + require.Equal(t, "test2", chks[1].labels.Get("app")) + require.Equal(t, int64(2), chks[2].meta.MinTime) + }) + + t.Run("backward direction", func(t *testing.T) { + sortChunks(chks, logproto.BACKWARD) + require.Equal(t, int64(4), chks[0].meta.MaxTime) + require.Equal(t, "test1", chks[0].labels.Get("app")) + require.Equal(t, int64(3), chks[1].meta.MaxTime) + require.Equal(t, "test1", chks[1].labels.Get("app")) + require.Equal(t, int64(3), chks[2].meta.MaxTime) + require.Equal(t, "test2", chks[2].labels.Get("app")) + }) +} + +func TestIsOverlapping(t *testing.T) { + tests := []struct { + name string + first ChunkData + second ChunkData + direction logproto.Direction + expected bool + }{ + { + name: "overlapping forward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}}, + direction: logproto.FORWARD, + expected: true, + }, + { + name: "non-overlapping forward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 2}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 3, MaxTime: 4}}, + direction: logproto.FORWARD, + expected: false, + }, + { + name: "overlapping backward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}}, + direction: logproto.BACKWARD, + expected: true, + }, + { + name: "non-overlapping backward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 3, MaxTime: 4}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 2}}, + direction: logproto.BACKWARD, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isOverlapping(tt.first, tt.second, tt.direction) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestBaseChunkIterator(t *testing.T) { + ctx := context.Background() + + testCases := []struct { + name string + chunks []ChunkData + direction logproto.Direction + expected [][]ChunkData + }{ + { + name: "Forward, non-overlapping", + chunks: []ChunkData{ + newTestChunkData("1", 100, 200), + newTestChunkData("2", 300, 400), + newTestChunkData("3", 500, 600), + newTestChunkData("4", 700, 800), + }, + direction: logproto.FORWARD, + expected: [][]ChunkData{ + {newTestChunkData("1", 100, 200), newTestChunkData("2", 300, 400)}, + {newTestChunkData("3", 500, 600), newTestChunkData("4", 700, 800)}, + }, + }, + { + name: "Backward, non-overlapping", + chunks: []ChunkData{ + newTestChunkData("4", 700, 800), + newTestChunkData("3", 500, 600), + newTestChunkData("2", 300, 400), + newTestChunkData("1", 100, 200), + }, + direction: logproto.BACKWARD, + expected: [][]ChunkData{ + {newTestChunkData("4", 700, 800), newTestChunkData("3", 500, 600)}, + {newTestChunkData("2", 300, 400), newTestChunkData("1", 100, 200)}, + }, + }, + { + name: "Forward, overlapping", + chunks: []ChunkData{ + newTestChunkData("1", 100, 300), + newTestChunkData("2", 200, 400), + newTestChunkData("3", 350, 550), + newTestChunkData("4", 600, 800), + }, + direction: logproto.FORWARD, + expected: [][]ChunkData{ + {newTestChunkData("1", 100, 300), newTestChunkData("2", 200, 400), newTestChunkData("3", 350, 550)}, + {newTestChunkData("4", 600, 800)}, + }, + }, + { + name: "Backward, overlapping", + chunks: []ChunkData{ + newTestChunkData("4", 600, 800), + newTestChunkData("3", 350, 550), + newTestChunkData("2", 200, 400), + newTestChunkData("1", 100, 300), + newTestChunkData("0", 10, 20), + }, + direction: logproto.BACKWARD, + expected: [][]ChunkData{ + {newTestChunkData("4", 600, 800), newTestChunkData("3", 350, 550), newTestChunkData("2", 200, 400), newTestChunkData("1", 100, 300)}, + {newTestChunkData("0", 10, 20)}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + iter := &testBaseChunkIterator{ + baseChunksIterator: baseChunksIterator[*testIterator]{ + ctx: ctx, + chunks: tc.chunks, + direction: tc.direction, + bachSize: 2, + batch: make([]ChunkData, 0, 2), + iteratorFactory: func(chunks []ChunkData) (*testIterator, error) { + return &testIterator{chunks: chunks}, nil + }, + isNil: func(it *testIterator) bool { return it == nil }, + }, + } + var batches [][]ChunkData + for len(iter.chunks) > 0 { + err := iter.nextBatch() + require.NoError(t, err) + + batch := make([]ChunkData, len(iter.batch)) + copy(batch, iter.batch) + batches = append(batches, batch) + } + + require.Equal(t, tc.expected, batches) + }) + } +} + +// Helper functions and types + +type testBaseChunkIterator struct { + baseChunksIterator[*testIterator] +} + +type testIterator struct { + chunks []ChunkData + index int +} + +func (t *testIterator) Next() bool { + t.index++ + return t.index < len(t.chunks) +} + +func (t *testIterator) Close() error { return nil } +func (t *testIterator) Err() error { return nil } +func (t *testIterator) StreamHash() uint64 { return 0 } +func (t *testIterator) Labels() string { return "" } +func (t *testIterator) At() logproto.Entry { return logproto.Entry{} } + +func newTestChunkData(id string, minTime, maxTime int64) ChunkData { + return ChunkData{ + id: id, + meta: &walchunks.Meta{ + MinTime: minTime, + MaxTime: maxTime, + }, + labels: labels.Labels{}, + } +} + +func createChunk(minTime, maxTime int64, labelName, labelValue string) ChunkData { + return ChunkData{ + meta: &walchunks.Meta{ + MinTime: minTime, + MaxTime: maxTime, + }, + labels: labels.FromStrings(labelName, labelValue), + } +} + +func assertEqualChunks(t *testing.T, expected, actual ChunkData) { + require.Equal(t, expected.meta.MinTime, actual.meta.MinTime, "MinTime mismatch") + require.Equal(t, expected.meta.MaxTime, actual.meta.MaxTime, "MaxTime mismatch") + require.Equal(t, expected.labels, actual.labels, "Labels mismatch") +} + +func generateTestChunkData(totalEntries int) []struct { + labels labels.Labels + entries []*logproto.Entry +} { + var chunkData []struct { + labels labels.Labels + entries []*logproto.Entry + } + + entriesPerChunk := defaultBatchSize * 2 // Each chunk will contain 2 batches worth of entries + numChunks := (totalEntries + entriesPerChunk - 1) / entriesPerChunk + + for i := 0; i < numChunks; i++ { + startIndex := i * entriesPerChunk + endIndex := (i + 1) * entriesPerChunk + if endIndex > totalEntries { + endIndex = totalEntries + } + + chunkData = append(chunkData, struct { + labels labels.Labels + entries []*logproto.Entry + }{ + labels: labels.FromStrings("app", fmt.Sprintf("test%d", i)), + entries: generateEntries(startIndex, endIndex-1), + }) + } + + return chunkData +} + +func writeChunksToStorage(t *testing.T, storage *mockBlockStorage, chunkData []struct { + labels labels.Labels + entries []*logproto.Entry +}, +) []ChunkData { + chks := make([]ChunkData, 0, len(chunkData)) + for i, cd := range chunkData { + var buf bytes.Buffer + chunkID := fmt.Sprintf("chunk%d", i) + _, err := walchunks.WriteChunk(&buf, cd.entries, walchunks.EncodingSnappy) + require.NoError(t, err) + + storage.data[wal.Dir+chunkID] = buf.Bytes() + chks = append(chks, newChunkData(chunkID, labelsToScratchBuilder(cd.labels), &walchunks.Meta{ + Ref: walchunks.NewChunkRef(0, uint64(buf.Len())), + MinTime: cd.entries[0].Timestamp.UnixNano(), + MaxTime: cd.entries[len(cd.entries)-1].Timestamp.UnixNano(), + })) + } + return chks +} + +func generateEntries(start, end int) []*logproto.Entry { + var entries []*logproto.Entry + for i := start; i <= end; i++ { + entries = append(entries, &logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("line%d", i), + }) + } + return entries +} + +func flattenEntries(chunkData []struct { + labels labels.Labels + entries []*logproto.Entry +}, +) []logproto.Entry { + var result []logproto.Entry + for _, cd := range chunkData { + for _, e := range cd.entries { + result = append(result, logproto.Entry{Timestamp: e.Timestamp, Line: e.Line}) + } + } + return result +} + +func reverseEntries(entries []logproto.Entry) []logproto.Entry { + for i := 0; i < len(entries)/2; i++ { + j := len(entries) - 1 - i + entries[i], entries[j] = entries[j], entries[i] + } + return entries +} + +func selectEntries(entries []logproto.Entry, start, end int) []logproto.Entry { + var result []logproto.Entry + for _, e := range entries { + if e.Timestamp.Unix() >= int64(start) && e.Timestamp.Unix() < int64(end) { + result = append(result, e) + } + } + return result +} + +func entriesToSamples(entries []logproto.Entry) []logproto.Sample { + var samples []logproto.Sample + for _, e := range entries { + samples = append(samples, logproto.Sample{ + Timestamp: e.Timestamp.UnixNano(), + Value: float64(1), // Use timestamp as value for simplicity + }) + } + return samples +} + +func iterateEntries(iterator *ChunksEntryIterator[iter.EntryIterator]) []logproto.Entry { + var result []logproto.Entry + for iterator.Next() { + entry := iterator.At() + result = append(result, logproto.Entry{Timestamp: entry.Timestamp, Line: entry.Line}) + } + return result +} + +func iterateSamples(iterator *ChunksSampleIterator[iter.SampleIterator]) []logproto.Sample { + var result []logproto.Sample + for iterator.Next() { + result = append(result, iterator.At()) + } + return result +} + +func assertEqualEntries(t *testing.T, expected, actual []logproto.Entry) { + require.Equal(t, len(expected), len(actual), "Number of entries mismatch") + for i := range expected { + require.Equal(t, expected[i].Timestamp, actual[i].Timestamp, "Timestamp mismatch at index %d", i) + require.Equal(t, expected[i].Line, actual[i].Line, "Line mismatch at index %d", i) + } +} + +func assertEqualSamples(t *testing.T, expected, actual []logproto.Sample) { + require.Equal(t, len(expected), len(actual), "Number of samples mismatch") + for i := range expected { + require.Equal(t, expected[i].Timestamp, actual[i].Timestamp, "Timestamp mismatch at index %d", i) + require.Equal(t, expected[i].Value, actual[i].Value, "Value mismatch at index %d", i) + } +} + +func labelsToScratchBuilder(lbs labels.Labels) *labels.ScratchBuilder { + sb := labels.NewScratchBuilder(len(lbs)) + sb.Reset() + for i := 0; i < len(lbs); i++ { + sb.Add(lbs[i].Name, lbs[i].Value) + } + return &sb +} diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go new file mode 100644 index 0000000000000..0fb2cc23dc525 --- /dev/null +++ b/pkg/querier-rf1/wal/querier.go @@ -0,0 +1,203 @@ +package wal + +import ( + "bytes" + "context" + "io" + "sync" + + "github.com/opentracing/opentracing-go" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + "github.com/grafana/dskit/tenant" + + "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/storage/wal" + "github.com/grafana/loki/v3/pkg/storage/wal/chunks" + "github.com/grafana/loki/v3/pkg/storage/wal/index" +) + +var _ logql.Querier = (*Querier)(nil) + +type BlockStorage interface { + GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) +} + +type Metastore interface { + ListBlocksForQuery(ctx context.Context, in *metastorepb.ListBlocksForQueryRequest, opts ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) +} + +type Querier struct { + blockStorage BlockStorage + metaStore Metastore +} + +func New( + metaStore Metastore, + blockStorage BlockStorage, +) (*Querier, error) { + return &Querier{ + blockStorage: blockStorage, + metaStore: metaStore, + }, nil +} + +func (q *Querier) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { + // todo request validation and delete markers. + tenantID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + expr, err := req.LogSelector() + if err != nil { + return nil, err + } + matchers := expr.Matchers() + // todo: not sure if Pipeline is thread safe + pipeline, err := expr.Pipeline() + if err != nil { + return nil, err + } + + chks, err := q.matchingChunks(ctx, tenantID, req.Start.UnixNano(), req.End.UnixNano(), matchers...) + if err != nil { + return nil, err + } + + return NewChunksEntryIterator(ctx, + q.blockStorage, + chks, + pipeline, + req.Direction, + req.Start.UnixNano(), + req.End.UnixNano()), nil +} + +func (q *Querier) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { + // todo request validation and delete markers. + tenantID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + expr, err := req.Expr() + if err != nil { + return nil, err + } + selector, err := expr.Selector() + if err != nil { + return nil, err + } + matchers := selector.Matchers() + // todo: not sure if Extractor is thread safe + + extractor, err := expr.Extractor() + if err != nil { + return nil, err + } + + chks, err := q.matchingChunks(ctx, tenantID, req.Start.UnixNano(), req.End.UnixNano(), matchers...) + if err != nil { + return nil, err + } + + return NewChunksSampleIterator(ctx, + q.blockStorage, + chks, + extractor, + req.Start.UnixNano(), + req.End.UnixNano()), nil +} + +func (q *Querier) matchingChunks(ctx context.Context, tenantID string, from, through int64, matchers ...*labels.Matcher) ([]ChunkData, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "matchingChunks") + defer sp.Finish() + // todo support sharding + var ( + lazyChunks []ChunkData + mtx sync.Mutex + ) + + err := q.forSeries(ctx, &metastorepb.ListBlocksForQueryRequest{ + TenantId: tenantID, + StartTime: from, + EndTime: through, + }, func(id string, lbs *labels.ScratchBuilder, chk *chunks.Meta) error { + mtx.Lock() + lazyChunks = append(lazyChunks, newChunkData(id, lbs, chk)) + mtx.Unlock() + return nil + }, matchers...) + if err != nil { + return nil, err + } + if sp != nil { + sp.LogKV("matchedChunks", len(lazyChunks)) + } + return lazyChunks, nil +} + +func (q *Querier) forSeries(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(string, *labels.ScratchBuilder, *chunks.Meta) error, matchers ...*labels.Matcher) error { + // copy matchers to avoid modifying the original slice. + ms := make([]*labels.Matcher, 0, len(matchers)+1) + ms = append(ms, matchers...) + ms = append(ms, labels.MustNewMatcher(labels.MatchEqual, index.TenantLabel, req.TenantId)) + + return q.forIndices(ctx, req, func(ir *index.Reader, id string) error { + bufLbls := labels.ScratchBuilder{} + chunks := make([]chunks.Meta, 0, 1) + p, err := ir.PostingsForMatchers(ctx, ms...) + if err != nil { + return err + } + for p.Next() { + err := ir.Series(p.At(), &bufLbls, &chunks) + if err != nil { + return err + } + if err := fn(id, &bufLbls, &chunks[0]); err != nil { + return err + } + } + return p.Err() + }) +} + +func (q *Querier) forIndices(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(ir *index.Reader, id string) error) error { + resp, err := q.metaStore.ListBlocksForQuery(ctx, req) + if err != nil { + return err + } + metas := resp.Blocks + if len(metas) == 0 { + return nil + } + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(32) + for _, meta := range metas { + + meta := meta + g.Go(func() error { + reader, err := q.blockStorage.GetObjectRange(ctx, wal.Dir+meta.Id, meta.IndexRef.Offset, meta.IndexRef.Length) + if err != nil { + return err + } + defer reader.Close() + // todo: use a buffer pool + buf := bytes.NewBuffer(make([]byte, 0, meta.IndexRef.Length)) + _, err = buf.ReadFrom(reader) + if err != nil { + return err + } + index, err := index.NewReader(index.RealByteSlice(buf.Bytes())) + if err != nil { + return err + } + return fn(index, meta.Id) + }) + } + return g.Wait() +} diff --git a/pkg/querier-rf1/wal/querier_test.go b/pkg/querier-rf1/wal/querier_test.go new file mode 100644 index 0000000000000..5d446b8515902 --- /dev/null +++ b/pkg/querier-rf1/wal/querier_test.go @@ -0,0 +1,697 @@ +package wal + +import ( + "bytes" + "context" + "fmt" + "io" + "sort" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/grafana/dskit/user" + + "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/querier/plan" + "github.com/grafana/loki/v3/pkg/storage/wal" + "github.com/grafana/loki/v3/pkg/storage/wal/chunks" +) + +// MockStorage is a simple in-memory storage for testing +type MockStorage struct { + data map[string][]byte +} + +func NewMockStorage() *MockStorage { + return &MockStorage{data: make(map[string][]byte)} +} + +func (m *MockStorage) GetObjectRange(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { + data, ok := m.data[objectKey] + if !ok { + return nil, fmt.Errorf("object not found: %s", objectKey) + } + return io.NopCloser(bytes.NewReader(data[off : off+length])), nil +} + +func (m *MockStorage) PutObject(objectKey string, data []byte) { + m.data[objectKey] = data +} + +// MockMetastore is a simple in-memory metastore for testing +type MockMetastore struct { + blocks map[string][]*metastorepb.BlockMeta +} + +func NewMockMetastore() *MockMetastore { + return &MockMetastore{blocks: make(map[string][]*metastorepb.BlockMeta)} +} + +func (m *MockMetastore) ListBlocksForQuery(_ context.Context, req *metastorepb.ListBlocksForQueryRequest, _ ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) { + blocks := m.blocks[req.TenantId] + var result []*metastorepb.BlockMeta + for _, block := range blocks { + if block.MinTime <= req.EndTime && block.MaxTime >= req.StartTime { + result = append(result, block) + } + } + return &metastorepb.ListBlocksForQueryResponse{Blocks: result}, nil +} + +func (m *MockMetastore) AddBlock(tenantID string, block *metastorepb.BlockMeta) { + m.blocks[tenantID] = append(m.blocks[tenantID], block) +} + +func TestQuerier_SelectLogs(t *testing.T) { + storage := NewMockStorage() + metastore := NewMockMetastore() + + querier, err := New(metastore, storage) + require.NoError(t, err) + + tenantID := "test-tenant" + ctx := user.InjectOrgID(context.Background(), tenantID) + + // Create expanded test data + testData := []struct { + labels labels.Labels + entries []*logproto.Entry + }{ + { + labels: labels.FromStrings("app", "test1", "env", "prod"), + entries: generateEntries(1000, 1050), + }, + { + labels: labels.FromStrings("app", "test2", "env", "staging"), + entries: generateEntries(1025, 1075), + }, + { + labels: labels.FromStrings("app", "test3", "env", "dev", "version", "v1"), + entries: generateEntries(1050, 1100), + }, + { + labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + entries: generateEntries(1075, 1125), + }, + } + + // Setup test data + setupTestData(t, storage, metastore, tenantID, testData) + + // Test cases + testCases := []struct { + name string + query string + expectedCount int + expectedFirst logproto.Entry + expectedLast logproto.Entry + }{ + { + name: "Query all logs", + query: `{app=~"test.*"}`, + expectedCount: 204, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1125, 0), + Line: "line1125", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + }, + { + name: "Query specific app", + query: `{app="test1"}`, + expectedCount: 51, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1050, 0), + Line: "line1050", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + }, + { + name: "Query with multiple label equality", + query: `{app="test4", env="prod"}`, + expectedCount: 51, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1075, 0), + Line: "line1075", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1125, 0), + Line: "line1125", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + }, + { + name: "Query with negative regex", + query: `{app=~"test.*", env!~"stag.*|dev"}`, + expectedCount: 102, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1125, 0), + Line: "line1125", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + }, + { + name: "Query with label presence", + query: `{app=~"test.*", version=""}`, + expectedCount: 102, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1075, 0), + Line: "line1075", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test2"}, + {Name: "env", Value: "staging"}, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + expr, err := syntax.ParseExpr(tc.query) + require.NoError(t, err) + + req := logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: tc.query, + Start: time.Unix(1000, 0), + End: time.Unix(1126, 0), + Limit: 10000, + Direction: logproto.FORWARD, + Plan: &plan.QueryPlan{ + AST: expr, + }, + }, + } + + iter, err := querier.SelectLogs(ctx, req) + require.NoError(t, err) + + results := collectPushEntries(t, iter) + + assert.Len(t, results, tc.expectedCount, "Unexpected number of log entries") + if len(results) > 0 { + assert.Equal(t, tc.expectedFirst, results[0], "First log entry mismatch") + assert.Equal(t, tc.expectedLast, results[len(results)-1], "Last log entry mismatch") + } + }) + } +} + +// SampleWithLabels is a new struct to hold both the sample and its labels +type SampleWithLabels struct { + Sample logproto.Sample + Labels labels.Labels +} + +func TestQuerier_SelectSamples(t *testing.T) { + storage := NewMockStorage() + metastore := NewMockMetastore() + + querier, err := New(metastore, storage) + require.NoError(t, err) + + tenantID := "test-tenant" + ctx := user.InjectOrgID(context.Background(), tenantID) + + // Create test data + testData := []struct { + labels labels.Labels + samples []logproto.Sample + }{ + { + labels: labels.FromStrings("app", "test1", "env", "prod"), + samples: generateSamples(1000, 1050, 1), + }, + { + labels: labels.FromStrings("app", "test2", "env", "staging"), + samples: generateSamples(1025, 1075, 2), + }, + { + labels: labels.FromStrings("app", "test3", "env", "dev", "version", "v1"), + samples: generateSamples(1050, 1100, 3), + }, + { + labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + samples: generateSamples(1075, 1125, 4), + }, + } + + // Setup test data + setupTestSampleData(t, storage, metastore, tenantID, testData) + + // Test cases + testCases := []struct { + name string + query string + expectedCount int + expectedFirst SampleWithLabels + expectedLast SampleWithLabels + }{ + { + name: "Query all samples", + query: `sum_over_time({app=~"test.*"} | label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 204, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1000, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1125, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + }, + }, + { + name: "Query specific app", + query: `sum_over_time({app="test1"}| label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 51, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1000, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1050, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + }, + { + name: "Query with multiple label equality", + query: `sum_over_time({app="test4", env="prod"}| label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 51, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1075, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1125, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + }, + }, + { + name: "Query with negative regex", + query: `sum_over_time({app=~"test.*", env!~"stag.*|dev"}| label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 102, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1000, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1125, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + expr, err := syntax.ParseExpr(tc.query) + require.NoError(t, err) + + req := logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: tc.query, + Start: time.Unix(1000, 0), + End: time.Unix(1126, 0), + Plan: &plan.QueryPlan{ + AST: expr, + }, + }, + } + + iter, err := querier.SelectSamples(ctx, req) + require.NoError(t, err) + + results := collectSamplesWithLabels(t, iter) + + assert.Len(t, results, tc.expectedCount, "Unexpected number of samples") + if len(results) > 0 { + assert.Equal(t, tc.expectedFirst.Sample, results[0].Sample, "First sample mismatch") + assert.Equal(t, tc.expectedFirst.Labels, results[0].Labels, "First sample labels mismatch") + assert.Equal(t, tc.expectedLast.Sample, results[len(results)-1].Sample, "Last sample mismatch") + assert.Equal(t, tc.expectedLast.Labels, results[len(results)-1].Labels, "Last sample labels mismatch") + } + }) + } +} + +func TestQuerier_matchingChunks(t *testing.T) { + storage := NewMockStorage() + metastore := NewMockMetastore() + + querier, err := New(metastore, storage) + require.NoError(t, err) + + tenantID := "test-tenant" + ctx := user.InjectOrgID(context.Background(), tenantID) + + // Create test data + testData := []struct { + labels labels.Labels + entries []*logproto.Entry + }{ + { + labels: labels.FromStrings("app", "app1", "env", "prod"), + entries: generateEntries(1000, 1050), + }, + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + entries: generateEntries(1025, 1075), + }, + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + entries: generateEntries(1050, 1100), + }, + } + + // Setup test data + setupTestData(t, storage, metastore, tenantID, testData) + + // Test cases + testCases := []struct { + name string + matchers []*labels.Matcher + start int64 + end int64 + expectedChunks []ChunkData + }{ + { + name: "Equality matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "app1"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app1", "env", "prod"), + meta: &chunks.Meta{MinTime: time.Unix(1000, 0).UnixNano(), MaxTime: time.Unix(1050, 0).UnixNano()}, + }, + }, + }, + { + name: "Negative matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "app", "app1"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + meta: &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()}, + }, + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, + }, + }, + { + name: "Regex matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "app[12]"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app1", "env", "prod"), + meta: &chunks.Meta{MinTime: time.Unix(1000, 0).UnixNano(), MaxTime: time.Unix(1050, 0).UnixNano()}, + }, + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + meta: &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()}, + }, + }, + }, + { + name: "Not regex matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotRegexp, "app", "app[12]"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, + }, + }, + { + name: "Multiple matchers", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "app.*"), + labels.MustNewMatcher(labels.MatchNotEqual, "env", "prod"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + meta: &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()}, + }, + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, + }, + }, + { + name: "Time range filter", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "app.*"), + }, + start: time.Unix(1080, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + chunks, err := querier.matchingChunks(ctx, tenantID, tc.start, tc.end, tc.matchers...) + require.NoError(t, err) + + sort.Slice(tc.expectedChunks, func(i, j int) bool { + return tc.expectedChunks[i].labels.String() < tc.expectedChunks[j].labels.String() + }) + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].labels.String() < chunks[j].labels.String() + }) + assert.Equal(t, len(tc.expectedChunks), len(chunks), "Unexpected number of matching chunks") + + // Verify that all returned chunks match the expected chunks + for i, expectedChunk := range tc.expectedChunks { + if i < len(chunks) { + assert.Equal(t, expectedChunk.labels, chunks[i].labels, "Labels mismatch for chunk %d", i) + assert.Equal(t, expectedChunk.meta.MinTime, chunks[i].meta.MinTime, "MinTime mismatch for chunk %d", i) + assert.Equal(t, expectedChunk.meta.MaxTime, chunks[i].meta.MaxTime, "MaxTime mismatch for chunk %d", i) + } + } + + // Additional checks for time range and matchers + for _, chunk := range chunks { + for _, matcher := range tc.matchers { + assert.True(t, matcher.Matches(chunk.labels.Get(matcher.Name)), + "Chunk labels %v do not match criteria %v", chunk.labels, matcher) + } + } + }) + } +} + +func setupTestData(t *testing.T, storage *MockStorage, metastore *MockMetastore, tenantID string, testData []struct { + labels labels.Labels + entries []*logproto.Entry +}, +) { + total := 0 + for i, data := range testData { + segmentID := fmt.Sprintf("segment%d", i) + writer, err := wal.NewWalSegmentWriter() + require.NoError(t, err) + total += len(data.entries) + writer.Append(tenantID, data.labels.String(), data.labels, data.entries, time.Now()) + + var buf bytes.Buffer + _, err = writer.WriteTo(&buf) + require.NoError(t, err) + + segmentData := buf.Bytes() + storage.PutObject(wal.Dir+segmentID, segmentData) + + blockMeta := writer.Meta(segmentID) + metastore.AddBlock(tenantID, blockMeta) + } + t.Log("Total entries in storage:", total) +} + +func collectPushEntries(t *testing.T, iter iter.EntryIterator) []logproto.Entry { + var results []logproto.Entry + for iter.Next() { + entry := iter.At() + lbs := iter.Labels() + parsed, err := syntax.ParseLabels(lbs) + require.NoError(t, err) + results = append(results, logproto.Entry{ + Timestamp: entry.Timestamp, + Line: entry.Line, + Parsed: logproto.FromLabelsToLabelAdapters(parsed), + }) + } + require.NoError(t, iter.Close()) + return results +} + +func collectSamplesWithLabels(t *testing.T, iter iter.SampleIterator) []SampleWithLabels { + var results []SampleWithLabels + for iter.Next() { + sample := iter.At() + labelString := iter.Labels() + parsedLabels, err := syntax.ParseLabels(labelString) + require.NoError(t, err) + results = append(results, SampleWithLabels{ + Sample: sample, + Labels: parsedLabels, + }) + } + require.NoError(t, iter.Close()) + return results +} + +func generateSamples(start, end int64, value float64) []logproto.Sample { + var samples []logproto.Sample + for i := start; i <= end; i++ { + samples = append(samples, logproto.Sample{ + Timestamp: time.Unix(i, 0).UnixNano(), + Value: value, + }) + } + return samples +} + +func setupTestSampleData(t *testing.T, storage *MockStorage, metastore *MockMetastore, tenantID string, testData []struct { + labels labels.Labels + samples []logproto.Sample +}, +) { + total := 0 + for i, data := range testData { + segmentID := fmt.Sprintf("segment%d", i) + writer, err := wal.NewWalSegmentWriter() + require.NoError(t, err) + total += len(data.samples) + + // Convert samples to entries for the WAL writer + entries := make([]*logproto.Entry, len(data.samples)) + for i, sample := range data.samples { + entries[i] = &logproto.Entry{ + Timestamp: time.Unix(0, sample.Timestamp), + Line: fmt.Sprintf("%f", sample.Value), + } + } + + writer.Append(tenantID, data.labels.String(), data.labels, entries, time.Now()) + + var buf bytes.Buffer + _, err = writer.WriteTo(&buf) + require.NoError(t, err) + + segmentData := buf.Bytes() + storage.PutObject(wal.Dir+segmentID, segmentData) + + blockMeta := writer.Meta(segmentID) + metastore.AddBlock(tenantID, blockMeta) + } + t.Log("Total samples in storage:", total) +} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go index 248cd523dab59..7c2dd99023b7d 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go @@ -424,6 +424,13 @@ func EmptyPostings() Postings { return emptyPostings } +// IsEmptyPostingsType returns true if the postings are an empty postings list. +// When this function returns false, it doesn't mean that the postings isn't empty +// (it could be an empty intersection of two non-empty postings, for example). +func IsEmptyPostingsType(p Postings) bool { + return p == emptyPostings +} + // ErrPostings returns new postings that immediately error. func ErrPostings(err error) Postings { return errPostings{err} diff --git a/pkg/storage/wal/chunks/chunks.go b/pkg/storage/wal/chunks/chunks.go index f0f2625596f5d..60b7b6612446b 100644 --- a/pkg/storage/wal/chunks/chunks.go +++ b/pkg/storage/wal/chunks/chunks.go @@ -15,6 +15,7 @@ import ( "github.com/klauspost/compress/s2" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -27,7 +28,10 @@ const ( ) // Initialize the CRC32 table -var castagnoliTable *crc32.Table +var ( + castagnoliTable *crc32.Table + _ iter.EntryIterator = (*entryBufferedIterator)(nil) +) func init() { castagnoliTable = crc32.MakeTable(crc32.Castagnoli) diff --git a/pkg/storage/wal/chunks/doc.go b/pkg/storage/wal/chunks/doc.go new file mode 100644 index 0000000000000..b3b50ed818c68 --- /dev/null +++ b/pkg/storage/wal/chunks/doc.go @@ -0,0 +1,37 @@ +// Package chunks provides functionality for efficient storage and retrieval of log data and metrics. +// +// The chunks package implements a compact and performant way to store and access +// log entries and metric samples. It uses various compression and encoding techniques to minimize +// storage requirements while maintaining fast access times. +// +// Key features: +// - Efficient chunk writing with multiple encoding options +// - Fast chunk reading with iterators for forward and backward traversal +// - Support for time-based filtering of log entries and metric samples +// - Integration with Loki's log query language (LogQL) for advanced filtering and processing +// - Separate iterators for log entries and metric samples +// +// Main types and functions: +// - WriteChunk: Writes log entries to a compressed chunk format +// - NewChunkReader: Creates a reader for parsing and accessing chunk data +// - NewEntryIterator: Provides an iterator for efficient traversal of log entries in a chunk +// - NewSampleIterator: Provides an iterator for efficient traversal of metric samples in a chunk +// +// Entry Iterator: +// The EntryIterator allows efficient traversal of log entries within a chunk. It supports +// both forward and backward iteration, time-based filtering, and integration with LogQL pipelines +// for advanced log processing. +// +// Sample Iterator: +// The SampleIterator enables efficient traversal of metric samples within a chunk. It supports +// time-based filtering and integration with LogQL extractors for advanced metric processing. +// This iterator is particularly useful for handling numeric data extracted from logs or +// pre-aggregated metrics. +// +// Both iterators implement methods for accessing the current entry or sample, checking for errors, +// and retrieving associated labels and stream hashes. +// +// This package is designed to work seamlessly with other components of the Loki +// log aggregation system, providing a crucial layer for data storage and retrieval of +// both logs and metrics. +package chunks diff --git a/pkg/storage/wal/chunks/entry_iterator.go b/pkg/storage/wal/chunks/entry_iterator.go new file mode 100644 index 0000000000000..9a127266b07a8 --- /dev/null +++ b/pkg/storage/wal/chunks/entry_iterator.go @@ -0,0 +1,115 @@ +package chunks + +import ( + "time" + + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + + "github.com/grafana/loki/pkg/push" +) + +type entryBufferedIterator struct { + reader *ChunkReader + pipeline log.StreamPipeline + from, through int64 + + cur logproto.Entry + currLabels log.LabelsResult +} + +// NewEntryIterator creates an iterator for efficiently traversing log entries in a chunk. +// It takes compressed chunk data, a processing pipeline, iteration direction, and a time range. +// The returned iterator filters entries based on the time range and applies the given pipeline. +// It handles both forward and backward iteration. +// +// Parameters: +// - chunkData: Compressed chunk data containing log entries +// - pipeline: StreamPipeline for processing and filtering entries +// - direction: Direction of iteration (FORWARD or BACKWARD) +// - from: Start timestamp (inclusive) for filtering entries +// - through: End timestamp (exclusive) for filtering entries +// +// Returns an EntryIterator and an error if creation fails. +func NewEntryIterator( + chunkData []byte, + pipeline log.StreamPipeline, + direction logproto.Direction, + from, through int64, +) (iter.EntryIterator, error) { + chkReader, err := NewChunkReader(chunkData) + if err != nil { + return nil, err + } + it := &entryBufferedIterator{ + reader: chkReader, + pipeline: pipeline, + from: from, + through: through, + } + if direction == logproto.FORWARD { + return it, nil + } + return iter.NewEntryReversedIter(it) +} + +// At implements iter.EntryIterator. +func (e *entryBufferedIterator) At() push.Entry { + return e.cur +} + +// Close implements iter.EntryIterator. +func (e *entryBufferedIterator) Close() error { + return e.reader.Close() +} + +// Err implements iter.EntryIterator. +func (e *entryBufferedIterator) Err() error { + return e.reader.Err() +} + +// Labels implements iter.EntryIterator. +func (e *entryBufferedIterator) Labels() string { + return e.currLabels.String() +} + +// Next implements iter.EntryIterator. +func (e *entryBufferedIterator) Next() bool { + for e.reader.Next() { + ts, line := e.reader.At() + // check if the timestamp is within the range before applying the pipeline. + if ts < e.from { + continue + } + if ts >= e.through { + return false + } + // todo: structured metadata. + newLine, lbs, matches := e.pipeline.Process(ts, line) + if !matches { + continue + } + e.currLabels = lbs + e.cur.Timestamp = time.Unix(0, ts) + e.cur.Line = string(newLine) + e.cur.StructuredMetadata = logproto.FromLabelsToLabelAdapters(lbs.StructuredMetadata()) + e.cur.Parsed = logproto.FromLabelsToLabelAdapters(lbs.Parsed()) + return true + } + return false +} + +// StreamHash implements iter.EntryIterator. +func (e *entryBufferedIterator) StreamHash() uint64 { + return e.pipeline.BaseLabels().Hash() +} + +type sampleBufferedIterator struct { + reader *ChunkReader + pipeline log.StreamSampleExtractor + from, through int64 + + cur logproto.Sample + currLabels log.LabelsResult +} diff --git a/pkg/storage/wal/chunks/entry_iterator_test.go b/pkg/storage/wal/chunks/entry_iterator_test.go new file mode 100644 index 0000000000000..a098161134a55 --- /dev/null +++ b/pkg/storage/wal/chunks/entry_iterator_test.go @@ -0,0 +1,143 @@ +package chunks + +import ( + "bytes" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" +) + +func TestNewEntryIterator(t *testing.T) { + tests := []struct { + name string + entries []*logproto.Entry + direction logproto.Direction + from int64 + through int64 + pipeline log.StreamPipeline + expected []*logproto.Entry + }{ + { + name: "Forward direction, all entries within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + direction: logproto.FORWARD, + from: 0, + through: 4, + pipeline: noopStreamPipeline(), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + }, + { + name: "Backward direction, all entries within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + direction: logproto.BACKWARD, + from: 0, + through: 4, + pipeline: noopStreamPipeline(), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + }, + }, + { + name: "Forward direction, partial range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + {Timestamp: time.Unix(0, 4), Line: "line 4"}, + }, + direction: logproto.FORWARD, + from: 2, + through: 4, + pipeline: noopStreamPipeline(), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + }, + { + name: "Forward direction with logql pipeline filter", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1).UTC(), Line: "error: something went wrong"}, + {Timestamp: time.Unix(0, 2).UTC(), Line: "info: operation successful"}, + {Timestamp: time.Unix(0, 3).UTC(), Line: "error: another error occurred"}, + {Timestamp: time.Unix(0, 4).UTC(), Line: "debug: checking status"}, + }, + direction: logproto.FORWARD, + from: 1, + through: 5, + pipeline: mustNewPipeline(t, `{foo="bar"} | line_format "foo {{ __line__ }}" |= "error"`), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "foo error: something went wrong"}, + {Timestamp: time.Unix(0, 3), Line: "foo error: another error occurred"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + + // Write the chunk + _, err := WriteChunk(&buf, tt.entries, EncodingSnappy) + require.NoError(t, err, "WriteChunk failed") + + // Create the iterator + iter, err := NewEntryIterator(buf.Bytes(), tt.pipeline, tt.direction, tt.from, tt.through) + require.NoError(t, err, "NewEntryIterator failed") + defer iter.Close() + + // Read entries using the iterator + var actualEntries []*logproto.Entry + for iter.Next() { + entry := iter.At() + actualEntries = append(actualEntries, &logproto.Entry{ + Timestamp: entry.Timestamp, + Line: entry.Line, + }) + } + require.NoError(t, iter.Err(), "Iterator encountered an error") + + // Compare actual entries with expected entries + require.Equal(t, tt.expected, actualEntries, "Entries do not match expected values") + }) + } +} + +// mustNewPipeline creates a new pipeline or fails the test +func mustNewPipeline(t *testing.T, query string) log.StreamPipeline { + t.Helper() + if query == "" { + return log.NewNoopPipeline().ForStream(labels.Labels{}) + } + expr, err := syntax.ParseLogSelector(query, true) + require.NoError(t, err) + + pipeline, err := expr.Pipeline() + require.NoError(t, err) + + return pipeline.ForStream(labels.Labels{}) +} + +func noopStreamPipeline() log.StreamPipeline { + return log.NewNoopPipeline().ForStream(labels.Labels{}) +} diff --git a/pkg/storage/wal/chunks/sample_iterator.go b/pkg/storage/wal/chunks/sample_iterator.go new file mode 100644 index 0000000000000..4d4b397b1dd58 --- /dev/null +++ b/pkg/storage/wal/chunks/sample_iterator.go @@ -0,0 +1,87 @@ +package chunks + +import ( + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" +) + +// NewSampleIterator creates an iterator for efficiently traversing samples in a chunk. +// It takes compressed chunk data, a processing pipeline, iteration direction, and a time range. +// The returned iterator filters samples based on the time range and applies the given pipeline. +// It handles both forward and backward iteration. +// +// Parameters: +// - chunkData: Compressed chunk data containing samples +// - pipeline: StreamSampleExtractor for processing and filtering samples +// - from: Start timestamp (inclusive) for filtering samples +// - through: End timestamp (exclusive) for filtering samples +// +// Returns a SampleIterator and an error if creation fails. +func NewSampleIterator( + chunkData []byte, + pipeline log.StreamSampleExtractor, + from, through int64, +) (iter.SampleIterator, error) { + chkReader, err := NewChunkReader(chunkData) + if err != nil { + return nil, err + } + it := &sampleBufferedIterator{ + reader: chkReader, + pipeline: pipeline, + from: from, + through: through, + } + return it, nil +} + +// At implements iter.SampleIterator. +func (s *sampleBufferedIterator) At() logproto.Sample { + return s.cur +} + +// Close implements iter.SampleIterator. +func (s *sampleBufferedIterator) Close() error { + return s.reader.Close() +} + +// Err implements iter.SampleIterator. +func (s *sampleBufferedIterator) Err() error { + return s.reader.Err() +} + +// Labels implements iter.SampleIterator. +func (s *sampleBufferedIterator) Labels() string { + return s.currLabels.String() +} + +// Next implements iter.SampleIterator. +func (s *sampleBufferedIterator) Next() bool { + for s.reader.Next() { + // todo: Only use length columns for bytes_over_time without filter. + ts, line := s.reader.At() + // check if the timestamp is within the range before applying the pipeline. + if ts < s.from { + continue + } + if ts >= s.through { + return false + } + // todo: structured metadata. + val, lbs, matches := s.pipeline.Process(ts, line) + if !matches { + continue + } + s.currLabels = lbs + s.cur.Value = val + s.cur.Timestamp = ts + return true + } + return false +} + +// StreamHash implements iter.SampleIterator. +func (s *sampleBufferedIterator) StreamHash() uint64 { + return s.pipeline.BaseLabels().Hash() +} diff --git a/pkg/storage/wal/chunks/sample_iterator_test.go b/pkg/storage/wal/chunks/sample_iterator_test.go new file mode 100644 index 0000000000000..4e208301ab9d6 --- /dev/null +++ b/pkg/storage/wal/chunks/sample_iterator_test.go @@ -0,0 +1,202 @@ +package chunks + +import ( + "bytes" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" +) + +func TestNewSampleIterator(t *testing.T) { + tests := []struct { + name string + entries []*logproto.Entry + from int64 + through int64 + extractor log.StreamSampleExtractor + expected []logproto.Sample + expectErr bool + }{ + { + name: "All samples within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1.0"}, + {Timestamp: time.Unix(0, 2), Line: "2.0"}, + {Timestamp: time.Unix(0, 3), Line: "3.0"}, + }, + from: 0, + through: 4, + extractor: mustNewExtractor(t, ""), + expected: []logproto.Sample{ + {Timestamp: 1, Value: 1.0, Hash: 0}, + {Timestamp: 2, Value: 1.0, Hash: 0}, + {Timestamp: 3, Value: 1.0, Hash: 0}, + }, + }, + { + name: "Partial range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1.0"}, + {Timestamp: time.Unix(0, 2), Line: "2.0"}, + {Timestamp: time.Unix(0, 3), Line: "3.0"}, + {Timestamp: time.Unix(0, 4), Line: "4.0"}, + }, + from: 2, + through: 4, + extractor: mustNewExtractor(t, ""), + expected: []logproto.Sample{ + {Timestamp: 2, Value: 1.0, Hash: 0}, + {Timestamp: 3, Value: 1.0, Hash: 0}, + }, + }, + { + name: "Pipeline filter", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "error: 1.0"}, + {Timestamp: time.Unix(0, 2), Line: "info: 2.0"}, + {Timestamp: time.Unix(0, 3), Line: "error: 3.0"}, + {Timestamp: time.Unix(0, 4), Line: "debug: 4.0"}, + }, + from: 1, + through: 5, + extractor: mustNewExtractor(t, `count_over_time({foo="bar"} |= "error"[1m])`), + expected: []logproto.Sample{ + {Timestamp: 1, Value: 1.0, Hash: 0}, + {Timestamp: 3, Value: 1.0, Hash: 0}, + }, + }, + { + name: "Pipeline filter with bytes_over_time", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "error: 1.0"}, + {Timestamp: time.Unix(0, 2), Line: "info: 2.0"}, + {Timestamp: time.Unix(0, 3), Line: "error: 3.0"}, + {Timestamp: time.Unix(0, 4), Line: "debug: 4.0"}, + }, + from: 1, + through: 5, + extractor: mustNewExtractor(t, `bytes_over_time({foo="bar"} |= "error"[1m])`), + expected: []logproto.Sample{ + {Timestamp: 1, Value: 10, Hash: 0}, + {Timestamp: 3, Value: 10, Hash: 0}, + }, + }, + { + name: "No samples within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1.0"}, + {Timestamp: time.Unix(0, 2), Line: "2.0"}, + }, + from: 3, + through: 5, + extractor: mustNewExtractor(t, ""), + expected: nil, + }, + { + name: "Empty chunk", + entries: []*logproto.Entry{}, + from: 0, + through: 5, + extractor: mustNewExtractor(t, ""), + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + + // Write the chunk + _, err := WriteChunk(&buf, tt.entries, EncodingSnappy) + require.NoError(t, err, "WriteChunk failed") + + // Create the iterator + iter, err := NewSampleIterator(buf.Bytes(), tt.extractor, tt.from, tt.through) + if tt.expectErr { + require.Error(t, err, "Expected an error but got none") + return + } + require.NoError(t, err, "NewSampleIterator failed") + defer iter.Close() + + // Read samples using the iterator + var actualSamples []logproto.Sample + for iter.Next() { + actualSamples = append(actualSamples, iter.At()) + } + require.NoError(t, iter.Err(), "Iterator encountered an error") + + // Compare actual samples with expected samples + require.Equal(t, tt.expected, actualSamples, "Samples do not match expected values") + + // Check labels + if len(actualSamples) > 0 { + require.Equal(t, tt.extractor.BaseLabels().String(), iter.Labels(), "Unexpected labels") + } + + // Check StreamHash + if len(actualSamples) > 0 { + require.Equal(t, tt.extractor.BaseLabels().Hash(), iter.StreamHash(), "Unexpected StreamHash") + } + }) + } +} + +func TestNewSampleIteratorErrors(t *testing.T) { + tests := []struct { + name string + chunkData []byte + extractor log.StreamSampleExtractor + from int64 + through int64 + }{ + { + name: "Invalid chunk data", + chunkData: []byte("invalid chunk data"), + extractor: mustNewExtractor(t, ""), + from: 0, + through: 10, + }, + { + name: "Nil extractor", + chunkData: []byte{}, // valid empty chunk + extractor: nil, + from: 0, + through: 10, + }, + { + name: "Invalid time range", + chunkData: []byte{}, // valid empty chunk + extractor: mustNewExtractor(t, ""), + from: 10, + through: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewSampleIterator(tt.chunkData, tt.extractor, tt.from, tt.through) + require.Error(t, err, "Expected an error but got none") + }) + } +} + +func mustNewExtractor(t *testing.T, query string) log.StreamSampleExtractor { + t.Helper() + if query == `` { + query = `count_over_time({foo="bar"}[1m])` + } + expr, err := syntax.ParseSampleExpr(query) + require.NoError(t, err) + + extractor, err := expr.Extractor() + require.NoError(t, err) + + return extractor.ForStream(labels.Labels{}) +} diff --git a/pkg/storage/wal/index/index.go b/pkg/storage/wal/index/index.go index 29436bd2044b8..8959824c92780 100644 --- a/pkg/storage/wal/index/index.go +++ b/pkg/storage/wal/index/index.go @@ -24,6 +24,8 @@ import ( "math" "slices" "sort" + "strings" + "unicode/utf8" "unsafe" "github.com/prometheus/prometheus/model/labels" @@ -51,8 +53,24 @@ const ( // checkContextEveryNIterations is used in some tight loops to check if the context is done. checkContextEveryNIterations = 128 + + TenantLabel = "__loki_tenant__" ) +// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped. +var regexMetaCharacterBytes [16]byte + +// isRegexMetaCharacter reports whether byte b needs to be escaped. +func isRegexMetaCharacter(b byte) bool { + return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0 +} + +func init() { + for _, b := range []byte(`.+*?()|[]{}^$`) { + regexMetaCharacterBytes[b%16] |= 1 << (b / 16) + } +} + var AllPostingsKey = labels.Label{} type indexWriterSeries struct { @@ -1908,3 +1926,256 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } + +// PostingsForMatchers assembles a single postings iterator against the index reader +// based on the given matchers. The resulting postings are not ordered by series. +func (r *Reader) PostingsForMatchers(ctx context.Context, ms ...*labels.Matcher) (index.Postings, error) { + var its, notIts []index.Postings + // See which label must be non-empty. + // Optimization for case like {l=~".", l!="1"}. + labelMustBeSet := make(map[string]bool, len(ms)) + for _, m := range ms { + if !m.Matches("") { + labelMustBeSet[m.Name] = true + } + } + isSubtractingMatcher := func(m *labels.Matcher) bool { + if !labelMustBeSet[m.Name] { + return true + } + return (m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp) && m.Matches("") + } + hasSubtractingMatchers, hasIntersectingMatchers := false, false + for _, m := range ms { + if isSubtractingMatcher(m) { + hasSubtractingMatchers = true + } else { + hasIntersectingMatchers = true + } + } + + if hasSubtractingMatchers && !hasIntersectingMatchers { + // If there's nothing to subtract from, add in everything and remove the notIts later. + // We prefer to get AllPostings so that the base of subtraction (i.e. allPostings) + // doesn't include series that may be added to the index reader during this function call. + k, v := index.AllPostingsKey() + allPostings, err := r.Postings(ctx, k, v) + if err != nil { + return nil, err + } + its = append(its, allPostings) + } + + // Sort matchers to have the intersecting matchers first. + // This way the base for subtraction is smaller and + // there is no chance that the set we subtract from + // contains postings of series that didn't exist when + // we constructed the set we subtract by. + slices.SortStableFunc(ms, func(i, j *labels.Matcher) int { + if !isSubtractingMatcher(i) && isSubtractingMatcher(j) { + return -1 + } + + return +1 + }) + + for _, m := range ms { + if ctx.Err() != nil { + return nil, ctx.Err() + } + switch { + case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least. + k, v := index.AllPostingsKey() + allPostings, err := r.Postings(ctx, k, v) + if err != nil { + return nil, err + } + its = append(its, allPostings) + case labelMustBeSet[m.Name]: + // If this matcher must be non-empty, we can be smarter. + matchesEmpty := m.Matches("") + isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp + switch { + case isNot && matchesEmpty: // l!="foo" + // If the label can't be empty and is a Not and the inner matcher + // doesn't match empty, then subtract it out at the end. + inverse, err := m.Inverse() + if err != nil { + return nil, err + } + + it, err := postingsForMatcher(ctx, r, inverse) + if err != nil { + return nil, err + } + notIts = append(notIts, it) + case isNot && !matchesEmpty: // l!="" + // If the label can't be empty and is a Not, but the inner matcher can + // be empty we need to use inversePostingsForMatcher. + inverse, err := m.Inverse() + if err != nil { + return nil, err + } + + it, err := inversePostingsForMatcher(ctx, r, inverse) + if err != nil { + return nil, err + } + if index.IsEmptyPostingsType(it) { + return index.EmptyPostings(), nil + } + its = append(its, it) + default: // l="a" + // Non-Not matcher, use normal postingsForMatcher. + it, err := postingsForMatcher(ctx, r, m) + if err != nil { + return nil, err + } + if index.IsEmptyPostingsType(it) { + return index.EmptyPostings(), nil + } + its = append(its, it) + } + default: // l="" + // If the matchers for a labelname selects an empty value, it selects all + // the series which don't have the label name set too. See: + // https://github.com/prometheus/prometheus/issues/3575 and + // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + it, err := inversePostingsForMatcher(ctx, r, m) + if err != nil { + return nil, err + } + notIts = append(notIts, it) + } + } + + it := index.Intersect(its...) + + for _, n := range notIts { + it = index.Without(it, n) + } + + return it, nil +} + +// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. +func inversePostingsForMatcher(ctx context.Context, ix *Reader, m *labels.Matcher) (index.Postings, error) { + // Fast-path for MatchNotRegexp matching. + // Inverse of a MatchNotRegexp is MatchRegexp (double negation). + // Fast-path for set matching. + if m.Type == labels.MatchNotRegexp { + setMatches := findSetMatches(m.GetRegexString()) + if len(setMatches) > 0 { + return ix.Postings(ctx, m.Name, setMatches...) + } + } + + // Fast-path for MatchNotEqual matching. + // Inverse of a MatchNotEqual is MatchEqual (double negation). + if m.Type == labels.MatchNotEqual { + return ix.Postings(ctx, m.Name, m.Value) + } + + vals, err := ix.LabelValues(ctx, m.Name) + if err != nil { + return nil, err + } + + var res []string + // If the inverse match is ="", we just want all the values. + if m.Type == labels.MatchEqual && m.Value == "" { + res = vals + } else { + for _, val := range vals { + if !m.Matches(val) { + res = append(res, val) + } + } + } + + return ix.Postings(ctx, m.Name, res...) +} + +func postingsForMatcher(ctx context.Context, ix *Reader, m *labels.Matcher) (index.Postings, error) { + // This method will not return postings for missing labels. + + // Fast-path for equal matching. + if m.Type == labels.MatchEqual { + return ix.Postings(ctx, m.Name, m.Value) + } + + // Fast-path for set matching. + if m.Type == labels.MatchRegexp { + setMatches := findSetMatches(m.GetRegexString()) + if len(setMatches) > 0 { + return ix.Postings(ctx, m.Name, setMatches...) + } + } + + vals, err := ix.LabelValues(ctx, m.Name) + if err != nil { + return nil, err + } + + var res []string + for _, val := range vals { + if m.Matches(val) { + res = append(res, val) + } + } + + if len(res) == 0 { + return index.EmptyPostings(), nil + } + + return ix.Postings(ctx, m.Name, res...) +} + +func findSetMatches(pattern string) []string { + // Return empty matches if the wrapper from Prometheus is missing. + if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" { + return nil + } + escaped := false + sets := []*strings.Builder{{}} + init := 4 + end := len(pattern) - 2 + // If the regex is wrapped in a group we can remove the first and last parentheses + if pattern[init] == '(' && pattern[end-1] == ')' { + init++ + end-- + } + for i := init; i < end; i++ { + if escaped { + switch { + case isRegexMetaCharacter(pattern[i]): + sets[len(sets)-1].WriteByte(pattern[i]) + case pattern[i] == '\\': + sets[len(sets)-1].WriteByte('\\') + default: + return nil + } + escaped = false + } else { + switch { + case isRegexMetaCharacter(pattern[i]): + if pattern[i] == '|' { + sets = append(sets, &strings.Builder{}) + } else { + return nil + } + case pattern[i] == '\\': + escaped = true + default: + sets[len(sets)-1].WriteByte(pattern[i]) + } + } + } + matches := make([]string, 0, len(sets)) + for _, s := range sets { + if s.Len() > 0 { + matches = append(matches, s.String()) + } + } + return matches +} diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 65ee7c093d2ff..93b824bbcb70e 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -35,7 +35,7 @@ var ( } }, } - tenantLabel = "__loki_tenant__" + Dir = "loki-v2/wal/anon/" ) func init() { @@ -156,8 +156,8 @@ func (b *SegmentWriter) getOrCreateStream(id streamID, lbls labels.Labels) *stre if ok { return s } - if lbls.Get(tenantLabel) == "" { - lbls = labels.NewBuilder(lbls).Set(tenantLabel, id.tenant).Labels() + if lbls.Get(index.TenantLabel) == "" { + lbls = labels.NewBuilder(lbls).Set(index.TenantLabel, id.tenant).Labels() } s = streamSegmentPool.Get().(*streamSegment) s.lbls = lbls diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index 90755adcfcc3f..cbe42587bf7a2 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -122,7 +122,7 @@ func TestWalSegmentWriter_Append(t *testing.T) { require.True(t, ok) lbs, err := syntax.ParseLabels(expected.labels) require.NoError(t, err) - lbs = append(lbs, labels.Label{Name: tenantLabel, Value: expected.tenant}) + lbs = append(lbs, labels.Label{Name: index.TenantLabel, Value: expected.tenant}) sort.Sort(lbs) require.Equal(t, lbs, stream.lbls) require.Equal(t, expected.entries, stream.entries) @@ -168,7 +168,7 @@ func TestMultiTenantWrite(t *testing.T) { for _, tenant := range tenants { for _, lbl := range lbls { - expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(tenantLabel, tenant).Labels().String()) + expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(index.TenantLabel, tenant).Labels().String()) } }