From e9da0c2d30ffaa348160845909ff5b6bfa7fe6e1 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 26 Aug 2024 17:11:33 +0200 Subject: [PATCH 1/5] chore: Avoid looking up tsdb symbols during Volume API --- pkg/ingester/instance_test.go | 28 ++++++----- pkg/storage/chunk/interface.go | 1 + pkg/storage/store_test.go | 34 ++++++++----- .../shipper/indexshipper/tsdb/head_read.go | 13 ++++- .../shipper/indexshipper/tsdb/index/index.go | 50 +++++++++++++++++-- .../shipper/indexshipper/tsdb/querier.go | 2 +- .../indexshipper/tsdb/single_file_index.go | 25 ++++++++-- .../tsdb/single_file_index_test.go | 48 ++++++++++++++++-- 8 files changed, 162 insertions(+), 39 deletions(-) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 6867b0e1519c2..3e2a822d3eace 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool { return lbs.Get("log_stream") == "dispatcher" } +func (t *testFilter) RequiredLabelNames() []string { + return []string{"log_stream"} +} + func Test_ChunkFilter(t *testing.T) { instance := defaultInstance(t) instance.chunkFilter = &testFilter{} @@ -1173,7 +1177,7 @@ func TestInstance_Volume(t *testing.T) { volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1192,7 +1196,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, AggregateBy: seriesvolume.Series, @@ -1208,7 +1212,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1242,7 +1246,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"log_stream"}, @@ -1260,7 +1264,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1277,7 +1281,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, @@ -1297,7 +1301,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1317,7 +1321,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="worker"}`, Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1338,7 +1342,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1373,7 +1377,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1390,7 +1394,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1407,7 +1411,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, diff --git a/pkg/storage/chunk/interface.go b/pkg/storage/chunk/interface.go index 8da4312c60398..1fbb2beb52073 100644 --- a/pkg/storage/chunk/interface.go +++ b/pkg/storage/chunk/interface.go @@ -67,4 +67,5 @@ type RequestChunkFilterer interface { // Filterer filters chunks based on the metric. type Filterer interface { ShouldFilter(metric labels.Labels) bool + RequiredLabelNames() []string } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 3c9acdfa5a638..d31501893dc45 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -219,7 +219,8 @@ func getLocalStore(path string, cm ClientMetrics) Store { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 168, - }}, + }, + }, RowShards: 16, }, }, @@ -866,6 +867,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool { return metric.Get("foo") == "bazz" } +func (f fakeChunkFilterer) RequiredLabelNames() []string { + return []string{"foo"} +} + func Test_ChunkFilterer(t *testing.T) { s := &LokiStore{ Store: storeFixture, @@ -921,7 +926,6 @@ func Test_PipelineWrapper(t *testing.T) { s.SetPipelineWrapper(wrapper) ctx = user.InjectOrgID(context.Background(), "test-user") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) - if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -952,7 +956,6 @@ func Test_PipelineWrapper_disabled(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "test-user") ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) - if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -1292,7 +1295,8 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, } schemaConfig := config.SchemaConfig{ @@ -1366,7 +1370,8 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_tsdb_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, } schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2) @@ -1471,7 +1476,8 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, } periodConfigV11 := config.PeriodConfig{ @@ -1483,7 +1489,8 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, } @@ -1562,7 +1569,6 @@ func TestStore_MultiPeriod(t *testing.T) { } }) } - } func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry { @@ -1829,7 +1835,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, }, { @@ -1842,7 +1849,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, }, }, } @@ -1980,7 +1988,8 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, }, { @@ -1993,7 +2002,8 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, }, }, } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go index 3a0cf3cdbfc7d..cf709e7bd97c0 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6 return s.fp, nil } -func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) { +func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) { s := h.head.series.getByID(uint64(ref)) if s == nil { h.head.metrics.seriesNotFound.Inc() return 0, index.ChunkStats{}, storage.ErrNotFound } - *lbls = append((*lbls)[:0], s.ls...) + if len(by) == 0 { + *lbls = append((*lbls)[:0], s.ls...) + } else { + *lbls = (*lbls)[:0] + for _, l := range s.ls { + if _, ok := by[l.Name]; ok { + *lbls = append(*lbls, l) + } + } + } queryBounds := newBounds(model.Time(from), model.Time(through)) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go index 123750aea3de4..60a8d5f1d62cf 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l return fprint, nil } -func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) { +func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) { offset := id // In version 2+ series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab return 0, ChunkStats{}, d.Err() } - return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls) + return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by) } func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) { @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) if d.Err() != nil { return nil, 0, errors.Wrap(d.Err(), "read series label offsets") } + // todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls. + ln, err := dec.LookupSymbol(lno) + if err != nil { + return nil, 0, errors.Wrap(err, "lookup label name") + } + lv, err := dec.LookupSymbol(lvo) + if err != nil { + return nil, 0, errors.Wrap(err, "lookup label value") + } + + *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) + } + return &d, fprint, nil +} + +// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names. +// If `by` is empty, it returns all labels for the series. +func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) { + if len(by) == 0 { + return dec.prepSeries(b, lbls, chks) + } + *lbls = (*lbls)[:0] + if chks != nil { + *chks = (*chks)[:0] + } + d := encoding.DecWrap(tsdb_enc.Decbuf{B: b}) + + fprint := d.Be64() + k := d.Uvarint() + + for i := 0; i < k; i++ { + lno := uint32(d.Uvarint()) + lvo := uint32(d.Uvarint()) + + if d.Err() != nil { + return nil, 0, errors.Wrap(d.Err(), "read series label offsets") + } + // todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls. ln, err := dec.LookupSymbol(lno) if err != nil { return nil, 0, errors.Wrap(err, "lookup label name") } + if _, ok := by[ln]; !ok { + continue + } + lv, err := dec.LookupSymbol(lvo) if err != nil { return nil, 0, errors.Wrap(err, "lookup label value") @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) return &d, fprint, nil } -func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) { - d, fp, err := dec.prepSeries(b, lbls, nil) +func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) { + d, fp, err := dec.prepSeriesBy(b, lbls, nil, by) if err != nil { return 0, ChunkStats{}, err } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go index b29556c348cf6..60ec32ee954b0 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go @@ -69,7 +69,7 @@ type IndexReader interface { Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) // ChunkStats returns the stats for the chunks in the given series. - ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error) + ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) // LabelNames returns all the unique label names present in the index in sorted order. LabelNames(matchers ...*labels.Matcher) ([]string, error) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index 7934b952ba88f..c523381534fb2 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -74,7 +74,6 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp } return NewPrefixedIdentifier(id, parentDir, "") }) - if err != nil { return nil, err } @@ -195,7 +194,6 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, _ string, fpFilter index.Fing } return p.Err() }) - } func (i *TSDBIndex) forPostings( @@ -220,7 +218,6 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu if err := i.ForSeries(ctx, "", fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { for _, chk := range chks { - res = append(res, ChunkRef{ User: userID, // assumed to be the same, will be enforced by caller. Fingerprint: fp, @@ -298,7 +295,7 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim } for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, nil) if err != nil { return err } @@ -362,6 +359,24 @@ func (i *TSDBIndex) Volume( seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch))) aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" + var by map[string]struct{} + if !includeAll && (aggregateBySeries || len(targetLabels) > 0) { + by = make(map[string]struct{}, len(labelsToMatch)) + for k := range labelsToMatch { + by[k] = struct{}{} + } + if len(labelsToMatch) > 0 { + for k := range labelsToMatch { + by[k] = struct{}{} + } + } + // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. + if i.chunkFilter != nil { + for _, k := range i.chunkFilter.ForRequest(ctx).RequiredLabelNames() { + by[k] = struct{}{} + } + } + } return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { var ls labels.Labels @@ -371,7 +386,7 @@ func (i *TSDBIndex) Volume( } for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil { return fmt.Errorf("series volume: %w", err) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go index 068630c553a04..9784475091bf8 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "fmt" "math/rand" "sort" "testing" @@ -140,7 +141,6 @@ func TestSingleIdx(t *testing.T) { End: 10, Checksum: 3, }}, shardedRefs) - }) t.Run("Series", func(t *testing.T) { @@ -202,10 +202,8 @@ func TestSingleIdx(t *testing.T) { require.Nil(t, err) require.Equal(t, []string{"bar"}, vs) }) - }) } - } func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { @@ -743,10 +741,50 @@ func TestTSDBIndex_Volume(t *testing.T) { Limit: 10, }, acc.Volumes()) }) + // todo(cyriltovena): tests with chunk filterer }) }) } +func BenchmarkTSDBIndex_Volume(b *testing.B) { + var series []LoadableSeries + for i := 0; i < 1000; i++ { + series = append(series, LoadableSeries{ + Labels: mustParseLabels(fmt.Sprintf(`{foo="bar", fizz="fizz%d", buzz="buzz%d",bar="bar%d", bozz="bozz%d"}`, i, i, i, i)), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: uint32(i), + KB: 10, + Entries: 10, + }, + { + MinTime: 10, + MaxTime: 20, + Checksum: uint32(i), + KB: 10, + Entries: 10, + }, + }, + }) + } + ctx := context.Background() + from := model.Earliest + through := model.Latest + // Create the TSDB index + tempDir := b.TempDir() + tsdbIndex := BuildIndex(b, tempDir, series) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + acc := seriesvolume.NewAccumulator(10, 10) + err := tsdbIndex.Volume(ctx, "fake", from, through, acc, nil, nil, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+")) + require.NoError(b, err) + } +} + type filterAll struct{} func (f *filterAll) ForRequest(_ context.Context) chunk.Filterer { @@ -758,3 +796,7 @@ type filterAllFilterer struct{} func (f *filterAllFilterer) ShouldFilter(_ labels.Labels) bool { return true } + +func (f *filterAllFilterer) RequiredLabelNames() []string { + return nil +} From b2787650fbcbb4277f785922c7a1401287fddddf Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 26 Aug 2024 23:10:03 +0200 Subject: [PATCH 2/5] Also improve IndexStats API and fix wrong for loop --- .../shipper/indexshipper/tsdb/index/index.go | 2 +- .../shipper/indexshipper/tsdb/single_file_index.go | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go index 60a8d5f1d62cf..f3cb7653cbe9f 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go @@ -2234,7 +2234,7 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) // prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names. // If `by` is empty, it returns all labels for the series. func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) { - if len(by) == 0 { + if by == nil { return dec.prepSeries(b, lbls, chks) } *lbls = (*lbls)[:0] diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index c523381534fb2..5385d418ea689 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -290,12 +290,15 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim // TODO(owen-d): use pool var ls labels.Labels var filterer chunk.Filterer + by := make(map[string]struct{}) if i.chunkFilter != nil { filterer = i.chunkFilter.ForRequest(ctx) + for _, k := range i.chunkFilter.ForRequest(ctx).RequiredLabelNames() { + by[k] = struct{}{} + } } - for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, nil) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil { return err } @@ -365,11 +368,10 @@ func (i *TSDBIndex) Volume( for k := range labelsToMatch { by[k] = struct{}{} } - if len(labelsToMatch) > 0 { - for k := range labelsToMatch { - by[k] = struct{}{} - } + for _, k := range targetLabels { + by[k] = struct{}{} } + // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. if i.chunkFilter != nil { for _, k := range i.chunkFilter.ForRequest(ctx).RequiredLabelNames() { From a0a429d70320231e5d085c68911ba4a5d584c198 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Mon, 26 Aug 2024 16:58:38 -0600 Subject: [PATCH 3/5] fix: redundant loop and nil check filterer --- .../shipper/indexshipper/tsdb/single_file_index.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index 5385d418ea689..3eb2cbd5cb05b 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -293,8 +293,10 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim by := make(map[string]struct{}) if i.chunkFilter != nil { filterer = i.chunkFilter.ForRequest(ctx) - for _, k := range i.chunkFilter.ForRequest(ctx).RequiredLabelNames() { - by[k] = struct{}{} + if filterer != nil { + for _, k := range filterer.RequiredLabelNames() { + by[k] = struct{}{} + } } } for p.Next() { @@ -368,9 +370,6 @@ func (i *TSDBIndex) Volume( for k := range labelsToMatch { by[k] = struct{}{} } - for _, k := range targetLabels { - by[k] = struct{}{} - } // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. if i.chunkFilter != nil { From b476d0545416c52f6717755bdc629126b143596d Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Tue, 27 Aug 2024 12:09:40 -0600 Subject: [PATCH 4/5] fix: more nil checks --- .../shipper/indexshipper/tsdb/single_file_index.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index 3eb2cbd5cb05b..a27f12382cc81 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -292,8 +292,7 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim var filterer chunk.Filterer by := make(map[string]struct{}) if i.chunkFilter != nil { - filterer = i.chunkFilter.ForRequest(ctx) - if filterer != nil { + if filterer = i.chunkFilter.ForRequest(ctx); filterer != nil { for _, k := range filterer.RequiredLabelNames() { by[k] = struct{}{} } @@ -373,8 +372,10 @@ func (i *TSDBIndex) Volume( // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. if i.chunkFilter != nil { - for _, k := range i.chunkFilter.ForRequest(ctx).RequiredLabelNames() { - by[k] = struct{}{} + if filterer := i.chunkFilter.ForRequest(ctx); filterer != nil { + for _, k := range filterer.RequiredLabelNames() { + by[k] = struct{}{} + } } } } From 6ab0a22b37154760cb70b42e84b696fd85ec6126 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Tue, 27 Aug 2024 15:49:43 -0600 Subject: [PATCH 5/5] chore: refactor --- pkg/ingester/instance_test.go | 24 +++++++-------- pkg/storage/store_test.go | 30 ++++++++----------- .../indexshipper/tsdb/single_file_index.go | 24 ++++++++------- 3 files changed, 37 insertions(+), 41 deletions(-) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 3e2a822d3eace..4086831bb33fb 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1177,7 +1177,7 @@ func TestInstance_Volume(t *testing.T) { volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1196,7 +1196,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, AggregateBy: seriesvolume.Series, @@ -1212,7 +1212,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1246,7 +1246,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"log_stream"}, @@ -1264,7 +1264,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1281,7 +1281,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, @@ -1301,7 +1301,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1321,7 +1321,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="worker"}`, Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1342,7 +1342,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1377,7 +1377,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1394,7 +1394,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1411,7 +1411,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index d31501893dc45..101c906b8b4fe 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -219,8 +219,7 @@ func getLocalStore(path string, cm ClientMetrics) Store { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 168, - }, - }, + }}, RowShards: 16, }, }, @@ -926,6 +925,7 @@ func Test_PipelineWrapper(t *testing.T) { s.SetPipelineWrapper(wrapper) ctx = user.InjectOrgID(context.Background(), "test-user") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) + if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -956,6 +956,7 @@ func Test_PipelineWrapper_disabled(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "test-user") ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) + if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -1295,8 +1296,7 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, } schemaConfig := config.SchemaConfig{ @@ -1370,8 +1370,7 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_tsdb_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, } schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2) @@ -1476,8 +1475,7 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, } periodConfigV11 := config.PeriodConfig{ @@ -1489,8 +1487,7 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, } @@ -1569,6 +1566,7 @@ func TestStore_MultiPeriod(t *testing.T) { } }) } + } func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry { @@ -1835,8 +1833,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, }, { @@ -1849,8 +1846,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, }, }, } @@ -1988,8 +1984,7 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, }, { @@ -2002,8 +1997,7 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, }, }, } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index a27f12382cc81..255425b286f22 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -74,6 +74,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp } return NewPrefixedIdentifier(id, parentDir, "") }) + if err != nil { return nil, err } @@ -194,6 +195,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, _ string, fpFilter index.Fing } return p.Err() }) + } func (i *TSDBIndex) forPostings( @@ -218,6 +220,7 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu if err := i.ForSeries(ctx, "", fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { for _, chk := range chks { + res = append(res, ChunkRef{ User: userID, // assumed to be the same, will be enforced by caller. Fingerprint: fp, @@ -292,12 +295,14 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim var filterer chunk.Filterer by := make(map[string]struct{}) if i.chunkFilter != nil { - if filterer = i.chunkFilter.ForRequest(ctx); filterer != nil { + filterer = i.chunkFilter.ForRequest(ctx) + if filterer != nil { for _, k := range filterer.RequiredLabelNames() { by[k] = struct{}{} } } } + for p.Next() { fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil { @@ -364,6 +369,10 @@ func (i *TSDBIndex) Volume( aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" var by map[string]struct{} + var filterer chunk.Filterer + if i.chunkFilter != nil { + filterer = i.chunkFilter.ForRequest(ctx) + } if !includeAll && (aggregateBySeries || len(targetLabels) > 0) { by = make(map[string]struct{}, len(labelsToMatch)) for k := range labelsToMatch { @@ -371,22 +380,15 @@ func (i *TSDBIndex) Volume( } // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. - if i.chunkFilter != nil { - if filterer := i.chunkFilter.ForRequest(ctx); filterer != nil { - for _, k := range filterer.RequiredLabelNames() { - by[k] = struct{}{} - } + if filterer != nil { + for _, k := range filterer.RequiredLabelNames() { + by[k] = struct{}{} } } } return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { var ls labels.Labels - var filterer chunk.Filterer - if i.chunkFilter != nil { - filterer = i.chunkFilter.ForRequest(ctx) - } - for p.Next() { fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil {