Skip to content

Commit

Permalink
fix: Propagate headers/warnings/stats from quantile downstreams (graf…
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored and pascal-sochacki committed Aug 29, 2024
1 parent c2b53c9 commit 2ca7ed1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
47 changes: 45 additions & 2 deletions pkg/logql/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ func (a *BufferedAccumulator) Result() []logqlmodel.Result {

type QuantileSketchAccumulator struct {
matrix ProbabilisticQuantileMatrix

stats stats.Result // for accumulating statistics from downstream requests
headers map[string][]string // for accumulating headers from downstream requests
warnings map[string]struct{} // for accumulating warnings from downstream requests}
}

// newQuantileSketchAccumulator returns an accumulator for sharded
// probabilistic quantile queries that merges results as they come in.
func newQuantileSketchAccumulator() *QuantileSketchAccumulator {
return &QuantileSketchAccumulator{}
return &QuantileSketchAccumulator{
headers: make(map[string][]string),
warnings: make(map[string]struct{}),
}
}

func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel.Result, _ int) error {
Expand All @@ -57,6 +64,21 @@ func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel
if !ok {
return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data)
}

// TODO(owen-d/ewelch): Shard counts should be set by the querier
// so we don't have to do it in tricky ways in multiple places.
// See pkg/logql/downstream.go:DownstreamEvaluator.Downstream
// for another example.
if res.Statistics.Summary.Shards == 0 {
res.Statistics.Summary.Shards = 1
}
a.stats.Merge(res.Statistics)
metadata.ExtendHeaders(a.headers, res.Headers)

for _, w := range res.Warnings {
a.warnings[w] = struct{}{}
}

if a.matrix == nil {
a.matrix = data
return nil
Expand All @@ -68,7 +90,28 @@ func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel
}

func (a *QuantileSketchAccumulator) Result() []logqlmodel.Result {
return []logqlmodel.Result{{Data: a.matrix}}
headers := make([]*definitions.PrometheusResponseHeader, 0, len(a.headers))
for name, vals := range a.headers {
headers = append(
headers,
&definitions.PrometheusResponseHeader{
Name: name,
Values: vals,
},
)
}

warnings := maps.Keys(a.warnings)
sort.Strings(warnings)

return []logqlmodel.Result{
{
Data: a.matrix,
Headers: headers,
Warnings: warnings,
Statistics: a.stats,
},
}
}

// heap impl for keeping only the top n results across m streams
Expand Down
23 changes: 22 additions & 1 deletion pkg/logql/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/sketch"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase/definitions"
)

func TestAccumulatedStreams(t *testing.T) {
Expand Down Expand Up @@ -149,6 +151,22 @@ func TestDownstreamAccumulatorMultiMerge(t *testing.T) {
}
}

func TestQuantileSketchDownstreamAccumulatorSimple(t *testing.T) {
acc := newQuantileSketchAccumulator()
downstreamResult := newQuantileSketchResults()[0]

require.Nil(t, acc.Accumulate(context.Background(), downstreamResult, 0))

res := acc.Result()[0]
got, ok := res.Data.(ProbabilisticQuantileMatrix)
require.Equal(t, true, ok)
require.Equal(t, 10, len(got), "correct number of vectors")

require.Equal(t, res.Headers[0].Name, "HeaderA")
require.Equal(t, res.Warnings, []string{"warning"})
require.Equal(t, int64(33), res.Statistics.Summary.Shards)
}

func BenchmarkAccumulator(b *testing.B) {

// dummy params. Only need to populate direction & limit
Expand Down Expand Up @@ -218,6 +236,9 @@ func newStreamResults() []logqlmodel.Result {

func newQuantileSketchResults() []logqlmodel.Result {
results := make([]logqlmodel.Result, 100)
statistics := stats.Result{
Summary: stats.Summary{Shards: 33},
}

for r := range results {
vectors := make([]ProbabilisticQuantileVector, 10)
Expand All @@ -231,7 +252,7 @@ func newQuantileSketchResults() []logqlmodel.Result {
}
}
}
results[r] = logqlmodel.Result{Data: ProbabilisticQuantileMatrix(vectors)}
results[r] = logqlmodel.Result{Data: ProbabilisticQuantileMatrix(vectors), Headers: []*definitions.PrometheusResponseHeader{{Name: "HeaderA", Values: []string{"ValueA"}}}, Warnings: []string{"warning"}, Statistics: statistics}
}

return results
Expand Down

0 comments on commit 2ca7ed1

Please sign in to comment.