From 258765772e7815d07849c70f8c774a0902079145 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 24 May 2024 17:35:15 -0600 Subject: [PATCH] fix: grouping --- cmd/loki/loki-local-config.yaml | 2 + pkg/pattern/ingester.go | 4 +- pkg/pattern/instance.go | 13 +--- pkg/pattern/metric/chunk.go | 19 ++++- pkg/pattern/metric/chunk_test.go | 110 +++++++++++++++++++++++++++ pkg/pattern/metric/evaluator.go | 22 +++--- pkg/pattern/metric/evaluator_test.go | 4 - pkg/pattern/stream.go | 73 +++++------------- 8 files changed, 163 insertions(+), 84 deletions(-) diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 5f717a3d6a81c..913b9a4c18731 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -35,6 +35,8 @@ schema_config: pattern_ingester: enabled: true + metric_aggregation: + enabled: true ruler: alertmanager_url: http://localhost:9093 diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index a3d6c4ef67a0f..5d3da8ce1227c 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -48,11 +48,11 @@ type Config struct { func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger) cfg.ClientConfig.RegisterFlags(fs) + cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.") + fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.") fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - - cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.") } func (cfg *Config) Validate() error { diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index d98b412a85e24..c5b724e0f6fc3 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -125,24 +125,13 @@ func (i *instance) QuerySample( return nil, err } - typ, err := metric.ExtractMetricType(expr) - if err != nil || typ == metric.Unsupported { - return nil, err - } - var iters []iter.Iterator err = i.forMatchingStreams( selector.Matchers(), func(stream *stream) error { var iter iter.Iterator var err error - if typ == metric.Bytes { - iter, err = stream.BytesIterator(ctx, expr, from, through, step) - } else if typ == metric.Count { - iter, err = stream.CountIterator(ctx, expr, from, through, step) - } else { - return fmt.Errorf("unsupported query operation") - } + iter, err = stream.SampleIterator(ctx, expr, from, through, step) if err != nil { return err diff --git a/pkg/pattern/metric/chunk.go b/pkg/pattern/metric/chunk.go index c4716ba532dd1..cbc688461849d 100644 --- a/pkg/pattern/metric/chunk.go +++ b/pkg/pattern/metric/chunk.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/chunk" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/prometheus/common/model" @@ -51,12 +52,23 @@ func (c *Chunks) Observe(bytes, count uint64, ts model.Time) { func (c *Chunks) Iterator( ctx context.Context, typ MetricType, + grouping *syntax.Grouping, from, through, step model.Time, ) (iter.Iterator, error) { if typ == Unsupported { return nil, fmt.Errorf("unsupported metric type") } + lbls := c.labels + if grouping != nil { + sort.Strings(grouping.Groups) + lbls = make(labels.Labels, 0, len(grouping.Groups)) + for _, group := range grouping.Groups { + value := c.labels.Get(group) + lbls = append(lbls, labels.Label{Name: group, Value: value}) + } + } + iters := make([]iter.Iterator, 0, len(c.chunks)) for _, chunk := range c.chunks { samples, err := chunk.ForRangeAndType(typ, from, through, step) @@ -68,9 +80,10 @@ func (c *Chunks) Iterator( continue } - iters = append(iters, iter.NewLabelsSlice(c.labels, samples)) + iters = append(iters, iter.NewLabelsSlice(lbls, samples)) } - return iter.NewNonOverlappingLabelsIterator(c.labels, iters), nil + + return iter.NewNonOverlappingLabelsIterator(lbls, iters), nil } // TODO(twhitney): These values should be float64s (to match prometheus samples) or int64s (to match pattern samples) @@ -127,7 +140,7 @@ func (c *Chunk) spaceFor(ts model.Time) bool { return ts.Sub(c.Samples[0].Timestamp) < chunk.MaxChunkTime } -//TODO(twhitney): any way to remove the duplication between this and the drain chunk ForRange method? +// TODO(twhitney): any way to remove the duplication between this and the drain chunk ForRange method? // ForRangeAndType returns samples with only the values // in the given range [start:end] and aggregates them by step duration. // start and end are in milliseconds since epoch. step is a duration in milliseconds. diff --git a/pkg/pattern/metric/chunk_test.go b/pkg/pattern/metric/chunk_test.go index 32b746b0d20d0..4ae6a05753584 100644 --- a/pkg/pattern/metric/chunk_test.go +++ b/pkg/pattern/metric/chunk_test.go @@ -1,11 +1,15 @@ package metric import ( + "context" "reflect" "testing" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" ) @@ -327,3 +331,109 @@ func TestForRangeAndType(t *testing.T) { }) } } + +func Test_Chunks_Iterator(t *testing.T) { + ctx := context.Background() + lbls := labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + labels.Label{Name: "container", Value: "jar"}, + } + chunks := Chunks{ + chunks: []Chunk{ + { + Samples: []MetricSample{ + {Timestamp: 2, Bytes: 2, Count: 1}, + {Timestamp: 4, Bytes: 4, Count: 3}, + {Timestamp: 6, Bytes: 6, Count: 5}, + }, + mint: 2, + maxt: 6, + }, + }, + labels: lbls, + } + + t.Run("without grouping", func(t *testing.T) { + it, err := chunks.Iterator(ctx, Bytes, nil, 0, 10, 2) + require.NoError(t, err) + + res, err := iter.ReadAllWithLabels(it) + require.NoError(t, err) + + require.Equal(t, 1, len(res.Series)) + require.Equal(t, lbls.String(), res.Series[0].GetLabels()) + + it, err = chunks.Iterator(ctx, Count, nil, 0, 10, 2) + require.NoError(t, err) + + res, err = iter.ReadAllWithLabels(it) + require.NoError(t, err) + + require.Equal(t, 1, len(res.Series)) + require.Equal(t, lbls.String(), res.Series[0].GetLabels()) + }) + + t.Run("grouping", func(t *testing.T) { + grouping := &syntax.Grouping{ + Groups: []string{"container"}, + Without: false, + } + + expectedLabels := labels.Labels{ + labels.Label{ + Name: "container", + Value: "jar", + }, + } + + it, err := chunks.Iterator(ctx, Bytes, grouping, 0, 10, 2) + require.NoError(t, err) + + res, err := iter.ReadAllWithLabels(it) + require.NoError(t, err) + + require.Equal(t, 1, len(res.Series)) + require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels()) + + it, err = chunks.Iterator(ctx, Count, grouping, 0, 10, 2) + require.NoError(t, err) + + res, err = iter.ReadAllWithLabels(it) + require.NoError(t, err) + + require.Equal(t, 1, len(res.Series)) + require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels()) + }) + + t.Run("grouping by a missing label", func(t *testing.T) { + grouping := &syntax.Grouping{ + Groups: []string{"missing"}, + Without: false, + } + + expectedLabels := labels.Labels{ + labels.Label{ + Name: "missing", + Value: "", + }, + } + + it, err := chunks.Iterator(ctx, Bytes, grouping, 0, 10, 2) + require.NoError(t, err) + + res, err := iter.ReadAllWithLabels(it) + require.NoError(t, err) + + require.Equal(t, 1, len(res.Series)) + require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels()) + + it, err = chunks.Iterator(ctx, Count, grouping, 0, 10, 2) + require.NoError(t, err) + + res, err = iter.ReadAllWithLabels(it) + require.NoError(t, err) + + require.Equal(t, 1, len(res.Series)) + require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels()) + }) +} diff --git a/pkg/pattern/metric/evaluator.go b/pkg/pattern/metric/evaluator.go index 86a4c83870b97..e4c7368a959eb 100644 --- a/pkg/pattern/metric/evaluator.go +++ b/pkg/pattern/metric/evaluator.go @@ -19,7 +19,7 @@ import ( ) // TODO(twhitney): duplication with code in NewStepEvaluator -func ExtractMetricType(expr syntax.SampleExpr) (MetricType, error) { +func extractMetricType(expr syntax.SampleExpr) (MetricType, error) { var typ MetricType switch e := expr.(type) { case *syntax.VectorAggregationExpr: @@ -57,7 +57,6 @@ type SampleEvaluatorFactory interface { ctx context.Context, nextEvaluatorFactory SampleEvaluatorFactory, expr syntax.SampleExpr, - typ MetricType, from, through, step model.Time, ) (logql.StepEvaluator, error) } @@ -66,7 +65,6 @@ type SampleEvaluatorFunc func( ctx context.Context, nextEvaluatorFactory SampleEvaluatorFactory, expr syntax.SampleExpr, - typ MetricType, from, through, step model.Time, ) (logql.StepEvaluator, error) @@ -74,10 +72,9 @@ func (s SampleEvaluatorFunc) NewStepEvaluator( ctx context.Context, nextEvaluatorFactory SampleEvaluatorFactory, expr syntax.SampleExpr, - typ MetricType, from, through, step model.Time, ) (logql.StepEvaluator, error) { - return s(ctx, nextEvaluatorFactory, expr, typ, from, through, step) + return s(ctx, nextEvaluatorFactory, expr, from, through, step) } type DefaultEvaluatorFactory struct { @@ -94,9 +91,13 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator( ctx context.Context, evFactory SampleEvaluatorFactory, expr syntax.SampleExpr, - typ MetricType, from, through, step model.Time, ) (logql.StepEvaluator, error) { + metricType, err := extractMetricType(expr) + if err != nil || metricType == Unsupported { + return nil, err + } + switch e := expr.(type) { case *syntax.VectorAggregationExpr: if rangExpr, ok := e.Left.(*syntax.RangeAggregationExpr); ok && e.Operation == syntax.OpTypeSum { @@ -106,12 +107,11 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator( func(ctx context.Context, _ SampleEvaluatorFactory, _ syntax.SampleExpr, - typ MetricType, from, through, step model.Time, ) (logql.StepEvaluator, error) { fromWithRangeAndOffset := from.Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset) throughWithOffset := through.Add(-rangExpr.Left.Offset) - it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step) + it, err := ev.chunks.Iterator(ctx, metricType, e.Grouping, fromWithRangeAndOffset, throughWithOffset, step) if err != nil { return nil, err } @@ -129,7 +129,7 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator( if e.Grouping == nil { return nil, errors.Errorf("aggregation operator '%q' without grouping", e.Operation) } - nextEvaluator, err := evFactory.NewStepEvaluator(ctx, evFactory, e.Left, typ, from, through, step) + nextEvaluator, err := evFactory.NewStepEvaluator(ctx, evFactory, e.Left, from, through, step) if err != nil { return nil, err } @@ -145,7 +145,7 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator( case *syntax.RangeAggregationExpr: fromWithRangeAndOffset := from.Add(-e.Left.Interval).Add(-e.Left.Offset) throughWithOffset := through.Add(-e.Left.Offset) - it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step) + it, err := ev.chunks.Iterator(ctx, metricType, e.Grouping, fromWithRangeAndOffset, throughWithOffset, step) if err != nil { return nil, err } @@ -250,6 +250,8 @@ type SeriesToSampleIterator struct { lbls labels.Labels } +// TODO: could this me a matrix iterator that returned multiple samples with +// different labels for the same timestamp? func NewSeriesToSampleIterator(series *promql.Series) *SeriesToSampleIterator { return &SeriesToSampleIterator{ floats: series.Floats, diff --git a/pkg/pattern/metric/evaluator_test.go b/pkg/pattern/metric/evaluator_test.go index 9f23cb5546e6f..62440a354a64f 100644 --- a/pkg/pattern/metric/evaluator_test.go +++ b/pkg/pattern/metric/evaluator_test.go @@ -30,14 +30,10 @@ func Test_SampleEvaluator(t *testing.T) { expr, err := syntax.ParseSampleExpr(query) require.NoError(t, err) - typ, err := ExtractMetricType(expr) - require.NoError(t, err) - evaluator, err := factory.NewStepEvaluator( context.Background(), factory, expr.(syntax.SampleExpr), - typ, model.Time(now-fiveMin), model.Time(now), model.Time(fiveMin), ) diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 0c08790dd7087..de3538d7b05b7 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -2,6 +2,7 @@ package pattern import ( "context" + "errors" "math" "sync" "time" @@ -105,8 +106,7 @@ func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (it return iter.NewMerge(iters...), nil } -// TODO(twhitney): duplication between bytes and count iterators -func (s *stream) BytesIterator( +func (s *stream) SampleIterator( ctx context.Context, expr syntax.SampleExpr, from, through, step model.Time, @@ -118,7 +118,6 @@ func (s *stream) BytesIterator( ctx, s.evaluator, expr, - metric.Bytes, from, through, step, @@ -150,6 +149,7 @@ func (s *stream) BytesIterator( return metric.NewSeriesToSampleIterator(series), nil } +//TODO: should this join multiple series into a matrix, so we don't have the weird hack? func (s *stream) JoinSampleVector( next bool, ts int64, @@ -163,11 +163,6 @@ func (s *stream) JoinSampleVector( stepCount = 1 } - series := &promql.Series{ - Metric: s.labels, - Floats: make([]promql.FPoint, 0, stepCount), - } - vec := promql.Vector{} if next { vec = r.SampleVector() @@ -178,10 +173,22 @@ func (s *stream) JoinSampleVector( return nil, logqlmodel.NewSeriesLimitError(maxSeries) } + var seriesHash string + series := map[string]*promql.Series{} for next { vec = r.SampleVector() for _, p := range vec { - series.Floats = append(series.Floats, promql.FPoint{ + seriesHash = p.Metric.String() + s, ok := series[seriesHash] + if !ok { + s = &promql.Series{ + Metric: p.Metric, + Floats: make([]promql.FPoint, 0, stepCount), + } + series[p.Metric.String()] = s + } + + s.Floats = append(s.Floats, promql.FPoint{ T: ts, F: p.F, }) @@ -193,52 +200,12 @@ func (s *stream) JoinSampleVector( } } - return series, stepEvaluator.Error() -} - -// TODO(twhitney): duplication between bytes and count iterators -func (s *stream) CountIterator( - ctx context.Context, - expr syntax.SampleExpr, - from, through, step model.Time, -) (iter.Iterator, error) { - s.mtx.Lock() - defer s.mtx.Unlock() - - stepEvaluator, err := s.evaluator.NewStepEvaluator( - ctx, - s.evaluator, - expr, - metric.Count, - from, - through, - step, - ) - if err != nil { - return nil, err - } - - next, ts, r := stepEvaluator.Next() - if stepEvaluator.Error() != nil { - return nil, stepEvaluator.Error() + if len(series) > 1 { + // TODO: is this actually a problem? Should this just become a Matrix + return nil, errors.New("multiple series found in a single stream") } - // TODO(twhitney): actually get max series from limits - // this is only 1 series since we're already on a stream - // this this limit needs to also be enforced higher up - maxSeries := 1000 - series, err := s.JoinSampleVector( - next, - ts, - r, - stepEvaluator, - maxSeries, - from, through, step) - if err != nil { - return nil, err - } - - return metric.NewSeriesToSampleIterator(series), nil + return series[seriesHash], stepEvaluator.Error() } func (s *stream) prune(olderThan time.Duration) bool {