Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Avoid looking up unnecessary TSDB symbols during Volume API #13960

Merged
merged 6 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool {
return lbs.Get("log_stream") == "dispatcher"
}

func (t *testFilter) RequiredLabelNames() []string {
return []string{"log_stream"}
}

func Test_ChunkFilter(t *testing.T) {
instance := defaultInstance(t)
instance.chunkFilter = &testFilter{}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ type RequestChunkFilterer interface {
// Filterer filters chunks based on the metric.
type Filterer interface {
ShouldFilter(metric labels.Labels) bool
RequiredLabelNames() []string
}
4 changes: 4 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool {
return metric.Get("foo") == "bazz"
}

func (f fakeChunkFilterer) RequiredLabelNames() []string {
return []string{"foo"}
}

func Test_ChunkFilterer(t *testing.T) {
s := &LokiStore{
Store: storeFixture,
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6
return s.fp, nil
}

func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) {
func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) {
s := h.head.series.getByID(uint64(ref))

if s == nil {
h.head.metrics.seriesNotFound.Inc()
return 0, index.ChunkStats{}, storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.ls...)
if len(by) == 0 {
*lbls = append((*lbls)[:0], s.ls...)
} else {
*lbls = (*lbls)[:0]
for _, l := range s.ls {
if _, ok := by[l.Name]; ok {
*lbls = append(*lbls, l)
}
}
}

queryBounds := newBounds(model.Time(from), model.Time(through))

Expand Down
50 changes: 46 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l
return fprint, nil
}

func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
offset := id
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
Expand All @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab
return 0, ChunkStats{}, d.Err()
}

return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls)
return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by)
}

func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) {
Expand Down Expand Up @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
}

*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
return &d, fprint, nil
}

// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names.
// If `by` is empty, it returns all labels for the series.
func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) {
if by == nil {
return dec.prepSeries(b, lbls, chks)
}
*lbls = (*lbls)[:0]
if chks != nil {
*chks = (*chks)[:0]
}

d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})

fprint := d.Be64()
k := d.Uvarint()

for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())

if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
if _, ok := by[ln]; !ok {
continue
}

lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
Expand All @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
return &d, fprint, nil
}

func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeries(b, lbls, nil)
func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeriesBy(b, lbls, nil, by)
if err != nil {
return 0, ChunkStats{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type IndexReader interface {
Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)

// ChunkStats returns the stats for the chunks in the given series.
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error)
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error)

// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,18 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim
// TODO(owen-d): use pool
var ls labels.Labels
var filterer chunk.Filterer
by := make(map[string]struct{})
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls)
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
return err
}
Expand Down Expand Up @@ -362,16 +368,29 @@ func (i *TSDBIndex) Volume(
seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch)))

aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == ""
var by map[string]struct{}
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
}
if !includeAll && (aggregateBySeries || len(targetLabels) > 0) {
by = make(map[string]struct{}, len(labelsToMatch))
for k := range labelsToMatch {
by[k] = struct{}{}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
// If we are aggregating by series, we need to include all labels in the series required for filtering chunks.
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls)
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
return fmt.Errorf("series volume: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
Expand Down Expand Up @@ -140,7 +141,6 @@ func TestSingleIdx(t *testing.T) {
End: 10,
Checksum: 3,
}}, shardedRefs)

})

t.Run("Series", func(t *testing.T) {
Expand Down Expand Up @@ -202,10 +202,8 @@ func TestSingleIdx(t *testing.T) {
require.Nil(t, err)
require.Equal(t, []string{"bar"}, vs)
})

})
}

}

func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) {
Expand Down Expand Up @@ -743,10 +741,50 @@ func TestTSDBIndex_Volume(t *testing.T) {
Limit: 10,
}, acc.Volumes())
})
// todo(cyriltovena): tests with chunk filterer
})
})
}

func BenchmarkTSDBIndex_Volume(b *testing.B) {
var series []LoadableSeries
for i := 0; i < 1000; i++ {
series = append(series, LoadableSeries{
Labels: mustParseLabels(fmt.Sprintf(`{foo="bar", fizz="fizz%d", buzz="buzz%d",bar="bar%d", bozz="bozz%d"}`, i, i, i, i)),
Chunks: []index.ChunkMeta{
{
MinTime: 0,
MaxTime: 10,
Checksum: uint32(i),
KB: 10,
Entries: 10,
},
{
MinTime: 10,
MaxTime: 20,
Checksum: uint32(i),
KB: 10,
Entries: 10,
},
},
})
}
ctx := context.Background()
from := model.Earliest
through := model.Latest
// Create the TSDB index
tempDir := b.TempDir()
tsdbIndex := BuildIndex(b, tempDir, series)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.Volume(ctx, "fake", from, through, acc, nil, nil, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"))
require.NoError(b, err)
}
}

type filterAll struct{}

func (f *filterAll) ForRequest(_ context.Context) chunk.Filterer {
Expand All @@ -758,3 +796,7 @@ type filterAllFilterer struct{}
func (f *filterAllFilterer) ShouldFilter(_ labels.Labels) bool {
return true
}

func (f *filterAllFilterer) RequiredLabelNames() []string {
return nil
}
Loading