Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Avoid looking up unnecessary TSDB symbols during Volume API #13960

Merged
merged 6 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool {
return lbs.Get("log_stream") == "dispatcher"
}

func (t *testFilter) RequiredLabelNames() []string {
return []string{"log_stream"}
}

func Test_ChunkFilter(t *testing.T) {
instance := defaultInstance(t)
instance.chunkFilter = &testFilter{}
Expand Down Expand Up @@ -1173,7 +1177,7 @@ func TestInstance_Volume(t *testing.T) {

volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1192,7 +1196,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1208,7 +1212,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand Down Expand Up @@ -1242,7 +1246,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"log_stream"},
Expand All @@ -1260,7 +1264,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1277,7 +1281,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand All @@ -1297,7 +1301,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1317,7 +1321,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="worker"}`,
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1338,7 +1342,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand Down Expand Up @@ -1373,7 +1377,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1390,7 +1394,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1407,7 +1411,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ type RequestChunkFilterer interface {
// Filterer filters chunks based on the metric.
type Filterer interface {
ShouldFilter(metric labels.Labels) bool
RequiredLabelNames() []string
}
34 changes: 22 additions & 12 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ func getLocalStore(path string, cm ClientMetrics) Store {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
}},
},
},
RowShards: 16,
},
},
Expand Down Expand Up @@ -866,6 +867,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool {
return metric.Get("foo") == "bazz"
}

func (f fakeChunkFilterer) RequiredLabelNames() []string {
return []string{"foo"}
}

func Test_ChunkFilterer(t *testing.T) {
s := &LokiStore{
Store: storeFixture,
Expand Down Expand Up @@ -921,7 +926,6 @@ func Test_PipelineWrapper(t *testing.T) {
s.SetPipelineWrapper(wrapper)
ctx = user.InjectOrgID(context.Background(), "test-user")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -952,7 +956,6 @@ func Test_PipelineWrapper_disabled(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "test-user")
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -1292,7 +1295,8 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
}

schemaConfig := config.SchemaConfig{
Expand Down Expand Up @@ -1366,7 +1370,8 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_tsdb_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
}
schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2)
Expand Down Expand Up @@ -1471,7 +1476,8 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
}

periodConfigV11 := config.PeriodConfig{
Expand All @@ -1483,7 +1489,8 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
}

Expand Down Expand Up @@ -1562,7 +1569,6 @@ func TestStore_MultiPeriod(t *testing.T) {
}
})
}

}

func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry {
Expand Down Expand Up @@ -1829,7 +1835,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
},
{
Expand All @@ -1842,7 +1849,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
},
},
}
Expand Down Expand Up @@ -1980,7 +1988,8 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
},
{
Expand All @@ -1993,7 +2002,8 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
},
},
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6
return s.fp, nil
}

func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) {
func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) {
s := h.head.series.getByID(uint64(ref))

if s == nil {
h.head.metrics.seriesNotFound.Inc()
return 0, index.ChunkStats{}, storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.ls...)
if len(by) == 0 {
*lbls = append((*lbls)[:0], s.ls...)
} else {
*lbls = (*lbls)[:0]
for _, l := range s.ls {
if _, ok := by[l.Name]; ok {
*lbls = append(*lbls, l)
}
}
}

queryBounds := newBounds(model.Time(from), model.Time(through))

Expand Down
50 changes: 46 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l
return fprint, nil
}

func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
offset := id
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
Expand All @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab
return 0, ChunkStats{}, d.Err()
}

return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls)
return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by)
}

func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) {
Expand Down Expand Up @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
}

*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
return &d, fprint, nil
}

// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names.
// If `by` is empty, it returns all labels for the series.
func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) {
if by == nil {
return dec.prepSeries(b, lbls, chks)
}
*lbls = (*lbls)[:0]
if chks != nil {
*chks = (*chks)[:0]
}

d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})

fprint := d.Be64()
k := d.Uvarint()

for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())

if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
if _, ok := by[ln]; !ok {
continue
}

lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
Expand All @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
return &d, fprint, nil
}

func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeries(b, lbls, nil)
func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeriesBy(b, lbls, nil, by)
if err != nil {
return 0, ChunkStats{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type IndexReader interface {
Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)

// ChunkStats returns the stats for the chunks in the given series.
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error)
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error)

// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)
Expand Down
Loading
Loading