Skip to content

Commit

Permalink
feat: guard aggregation behavior behind a feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed May 23, 2024
1 parent f0d6a92 commit 68aa188
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 88 deletions.
6 changes: 5 additions & 1 deletion pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/clientpool"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/pattern/metric"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
Expand All @@ -38,6 +39,7 @@ type Config struct {
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`

MetricAggregation metric.AggregationConfig `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
// For testing.
factory ring_client.PoolFactory `yaml:"-"`
}
Expand All @@ -49,6 +51,8 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")

cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -335,7 +339,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(instanceID, i.logger, i.metrics)
inst, err = newInstance(instanceID, i.logger, i.metrics, i.cfg.MetricAggregation)
if err != nil {
return nil, err
}
Expand Down
92 changes: 57 additions & 35 deletions pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,70 @@ func NewIngesterQuerier(
}

func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
// validate that a supported query was provided
var expr syntax.Expr
_, err := syntax.ParseMatchers(req.Query, true)
if err != nil {
expr, err = syntax.ParseSampleExpr(req.Query)
if err != nil {
return nil, ErrParseQuery
// not a pattern query, so either a metric query or an error
if q.cfg.MetricAggregation.Enabled {
return q.queryMetricSamples(ctx, req)
}

var selector syntax.LogSelectorExpr
switch expr.(type) {
case *syntax.VectorAggregationExpr:
selector, err = expr.(*syntax.VectorAggregationExpr).Selector()
case *syntax.RangeAggregationExpr:
selector, err = expr.(*syntax.RangeAggregationExpr).Selector()
default:
return nil, ErrParseQuery
}
return nil, err
}

if err != nil {
return nil, err
}
return q.queryPatternSamples(ctx, req)
}

if selector == nil || selector.HasFilter() {
return nil, ErrParseQuery
}
func (q *IngesterQuerier) queryPatternSamples(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
iterators, err := q.query(ctx, req)
if err != nil {
return nil, err
}

// TODO(kolesnikovae): Incorporate with pruning
resp, err := iter.ReadPatternsBatch(iter.NewMerge(iterators...), math.MaxInt32)
if err != nil {
return nil, err
}
return prunePatterns(resp, minClusterSize), nil
}

func (q *IngesterQuerier) queryMetricSamples(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
expr, err := syntax.ParseSampleExpr(req.Query)
if err != nil {
return nil, err
}

var selector syntax.LogSelectorExpr
switch expr.(type) {

Check failure on line 82 in pkg/pattern/ingester_querier.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1034: assigning the result of this type assertion to a variable (switch expr := expr.(type)) could eliminate type assertions in switch cases (gosimple)
case *syntax.VectorAggregationExpr:
selector, err = expr.(*syntax.VectorAggregationExpr).Selector()

Check failure on line 84 in pkg/pattern/ingester_querier.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1034(related information): could eliminate this type assertion (gosimple)
case *syntax.RangeAggregationExpr:
selector, err = expr.(*syntax.RangeAggregationExpr).Selector()

Check failure on line 86 in pkg/pattern/ingester_querier.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1034(related information): could eliminate this type assertion (gosimple)
default:
return nil, ErrParseQuery
}

if err != nil {
return nil, err
}

if selector == nil || selector.HasFilter() {
return nil, ErrParseQuery
}

iterators, err := q.query(ctx, req)
if err != nil {
return nil, err
}

resp, err := iter.ReadMetricsBatch(iter.NewMerge(iterators...), math.MaxInt32)
if err != nil {
return nil, err
}
return resp, nil
}

func (q *IngesterQuerier) query(ctx context.Context, req *logproto.QueryPatternsRequest) ([]iter.Iterator, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.PatternClient) (interface{}, error) {
return client.Query(ctx, req)
})
Expand All @@ -83,21 +119,7 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
for i := range resps {
iterators[i] = iter.NewQueryClientIterator(resps[i].response.(logproto.Pattern_QueryClient))
}
switch expr.(type) {
case *syntax.VectorAggregationExpr, *syntax.RangeAggregationExpr:
resp, err := iter.ReadMetricsBatch(iter.NewMerge(iterators...), math.MaxInt32)
if err != nil {
return nil, err
}
return resp, nil
default:
// TODO(kolesnikovae): Incorporate with pruning
resp, err := iter.ReadPatternsBatch(iter.NewMerge(iterators...), math.MaxInt32)
if err != nil {
return nil, err
}
return prunePatterns(resp, minClusterSize), nil
}
return iterators, nil
}

func prunePatterns(
Expand Down
21 changes: 15 additions & 6 deletions pkg/pattern/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/pattern/metric"
)

func Test_prunePatterns(t *testing.T) {
Expand Down Expand Up @@ -49,19 +50,23 @@ func Test_prunePatterns(t *testing.T) {
func Test_Patterns(t *testing.T) {
t.Run("it rejects metric queries with filters", func(t *testing.T) {
q := &IngesterQuerier{
cfg: Config{},
cfg: Config{
MetricAggregation: metric.AggregationConfig{
Enabled: true,
},
},
logger: log.NewNopLogger(),
ringClient: &fakeRingClient{},
registerer: nil,
}
for _, query := range []string{
`count_over_time({foo="bar"} |= "baz" [5m])`,
`count_over_time({foo="bar"} != "baz" [5m])`,
`count_over_time({foo="bar"} =~ "baz" [5m])`,
`count_over_time({foo="bar"} |~ "baz" [5m])`,
`count_over_time({foo="bar"} !~ "baz" [5m])`,
`count_over_time({foo="bar"} | logfmt | color=blue [5m])`,
`count_over_time({foo="bar"} | logfmt | color="blue" [5m])`,
`sum(count_over_time({foo="bar"} |= "baz" [5m]))`,
`sum by label(count_over_time({foo="bar"} |= "baz" [5m]))`,
`sum by (label)(count_over_time({foo="bar"} |= "baz" [5m]))`,
`bytes_over_time({foo="bar"} |= "baz" [5m])`,
} {
_, err := q.Patterns(
Expand All @@ -71,14 +76,18 @@ func Test_Patterns(t *testing.T) {
},
)
require.Error(t, err, query)
require.ErrorIs(t, err, ErrParseQuery)
require.ErrorIs(t, err, ErrParseQuery, query)

}
})

t.Run("accepts log selector queries and count and bytes metric queries", func(t *testing.T) {
q := &IngesterQuerier{
cfg: Config{},
cfg: Config{
MetricAggregation: metric.AggregationConfig{
Enabled: true,
},
},
logger: log.NewNopLogger(),
ringClient: &fakeRingClient{},
registerer: nil,
Expand Down
29 changes: 15 additions & 14 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/pattern/metric"

"github.com/grafana/loki/pkg/push"
)

func TestInstancePushQuery(t *testing.T) {
t.Run("test pattern samples", func(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"))
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
setup := func() *instance {
inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"), metric.AggregationConfig{
Enabled: true,
})
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
return inst
}
t.Run("test pattern samples", func(t *testing.T) {
inst := setup()
err := inst.Push(context.Background(), &push.PushRequest{

Check failure on line 33 in pkg/pattern/ingester_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

ineffectual assignment to err (ineffassign)
Streams: []push.Stream{
{
Labels: lbs.String(),
Expand Down Expand Up @@ -92,11 +99,8 @@ func TestInstancePushQuery(t *testing.T) {
})

t.Run("test count_over_time samples", func(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger(), nil)
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
inst := setup()
err := inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Expand Down Expand Up @@ -184,11 +188,8 @@ func TestInstancePushQuery(t *testing.T) {
})

t.Run("test bytes_over_time samples", func(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger(), nil)
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
inst := setup()
err := inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Expand Down
41 changes: 24 additions & 17 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,29 @@ const indexShards = 32

// instance is a tenant instance of the pattern ingester.
type instance struct {
instanceID string
buf []byte // buffer used to compute fps.
mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
instanceID string
buf []byte // buffer used to compute fps.
mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
aggregationCfg metric.AggregationConfig
}

func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics) (*instance, error) {
func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, aggCfg metric.AggregationConfig) (*instance, error) {
index, err := index.NewBitPrefixWithShards(indexShards)
if err != nil {
return nil, err
}
i := &instance{
buf: make([]byte, 0, 1024),
logger: logger,
instanceID: instanceID,
streams: newStreamsMap(),
index: index,
metrics: metrics,
buf: make([]byte, 0, 1024),
logger: logger,
instanceID: instanceID,
streams: newStreamsMap(),
index: index,
metrics: metrics,
aggregationCfg: aggCfg,
}
i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint)
return i, nil
Expand All @@ -60,7 +62,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels,
func() (*stream, error) {
// add stream
return i.createStream(ctx, reqStream)
return i.createStream(ctx, reqStream, i.aggregationCfg.Enabled)
}, nil)
if err != nil {
appendErr.Add(err)
Expand Down Expand Up @@ -107,6 +109,11 @@ func (i *instance) QuerySample(
expr syntax.SampleExpr,
req *logproto.QueryPatternsRequest,
) (iter.Iterator, error) {
if !i.aggregationCfg.Enabled {
// Should never get here, but this will prevent nil pointer panics in test
return iter.Empty, nil
}

from, through := util.RoundToMilliseconds(req.Start, req.End)
step := model.Time(req.Step)
if step < chunk.TimeResolution {
Expand Down Expand Up @@ -184,14 +191,14 @@ outer:
return nil
}

func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream) (*stream, error) {
func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream, aggregateMetrics bool) (*stream, error) {

Check warning on line 194 in pkg/pattern/instance.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'aggregateMetrics' seems to be unused, consider removing or renaming it as _ (revive)
labels, err := syntax.ParseLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s, err := newStream(fp, sortedLabels, i.metrics)
s, err := newStream(fp, sortedLabels, i.metrics, i.aggregationCfg.Enabled)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/pattern/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/metric"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -28,7 +29,7 @@ func TestInstance_QuerySample(t *testing.T) {
Step: oneMin,
}

instance, err := newInstance("test", log.NewNopLogger(), nil)
instance, err := newInstance("test", log.NewNopLogger(), nil, metric.AggregationConfig{})
require.NoError(t, err)

labels := model.LabelSet{
Expand All @@ -37,7 +38,7 @@ func TestInstance_QuerySample(t *testing.T) {

lastTsMilli := (then + oneMin + oneMin) // 1715964095000

//TODO(twhitney): Add a few more pushes to this or another test
// TODO(twhitney): Add a few more pushes to this or another test
instance.Push(ctx, &logproto.PushRequest{

Check failure on line 42 in pkg/pattern/instance_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

Error return value of `instance.Push` is not checked (errcheck)
Streams: []push.Stream{
{
Expand Down
16 changes: 16 additions & 0 deletions pkg/pattern/metric/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package metric

import "flag"

type AggregationConfig struct {
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
}

// RegisterFlags registers pattern ingester related flags.
func (cfg *AggregationConfig) RegisterFlags(fs *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(fs, "")
}

func (cfg *AggregationConfig) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) {
fs.BoolVar(&cfg.Enabled, prefix+"metric-aggregation.enabled", false, "Flag to enable or disable metric aggregation.")
}
Loading

0 comments on commit 68aa188

Please sign in to comment.