From 31706aa162e13f38d1df869541bc6f36995e3317 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 6 Nov 2020 18:01:33 +0100 Subject: [PATCH 01/10] Work in progress limit series. Signed-off-by: Cyril Tovena --- pkg/logql/engine.go | 30 ++++++++- pkg/logql/engine_test.go | 64 +++++++++++++++++--- pkg/logql/error.go | 40 +++++++++--- pkg/logql/parser.go | 2 +- pkg/logql/parser_test.go | 2 +- pkg/logql/sharding.go | 5 +- pkg/logql/sharding_test.go | 7 ++- pkg/querier/querier.go | 2 +- pkg/querier/queryrange/limits.go | 26 ++++++++ pkg/querier/queryrange/querysharding.go | 6 +- pkg/querier/queryrange/querysharding_test.go | 3 + pkg/querier/queryrange/roundtrip.go | 2 + pkg/querier/queryrange/roundtrip_test.go | 5 ++ pkg/querier/queryrange/split_by_interval.go | 6 ++ pkg/util/server/error.go | 5 +- pkg/util/server/error_test.go | 2 + pkg/util/validation/limits.go | 7 +++ 17 files changed, 182 insertions(+), 32 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index af161d94f47ff..35b753bd27c8f 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/prometheus/client_golang/prometheus" @@ -86,14 +87,21 @@ func (opts *EngineOpts) applyDefault() { type Engine struct { timeout time.Duration evaluator Evaluator + limits Limits +} + +// Limits allow the engine to fetch limits for a given users. +type Limits interface { + MaxQuerySeries(userID string) int } // NewEngine creates a new LogQL Engine. -func NewEngine(opts EngineOpts, q Querier) *Engine { +func NewEngine(opts EngineOpts, q Querier, l Limits) *Engine { opts.applyDefault() return &Engine{ timeout: opts.Timeout, evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod), + limits: l, } } @@ -107,6 +115,7 @@ func (ng *Engine) Query(params Params) Query { return ParseExpr(query) }, record: true, + limits: ng.limits, } } @@ -120,6 +129,7 @@ type query struct { timeout time.Duration params Params parse func(context.Context, string) (Expr, error) + limits Limits evaluator Evaluator record bool } @@ -146,7 +156,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) { status := "200" if err != nil { status = "500" - if IsParseError(err) || IsPipelineError(err) { + if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) { status = "400" } } @@ -195,6 +205,11 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value, return q.evalLiteral(ctx, lit) } + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params) if err != nil { return nil, err @@ -202,11 +217,18 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value, defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close) seriesIndex := map[uint64]*promql.Series{} + maxSeries := q.limits.MaxQuerySeries(userID) next, ts, vec := stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() } + + // fail fast for the first step or instant query + if len(vec) > maxSeries { + return nil, newSeriesLimitError(maxSeries) + } + if GetRangeType(q.params) == InstantType { sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 }) return vec, nil @@ -238,6 +260,10 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value, V: p.V, }) } + // as we slowly build the full query for each steps, make sure we don't go over the limit of unique series. + if len(seriesIndex) > maxSeries { + return nil, newSeriesLimitError(maxSeries) + } next, ts, vec = stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 5acd40eb762e5..3e67e34f7a6e0 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -26,8 +27,17 @@ var ( testSize = int64(300) ErrMock = errors.New("mock error") ErrMockMultiple = errors.New("Multiple errors: [mock error mock error]") + NoLimits = &fakeLimits{maxSeries: math.MaxInt32} ) +type fakeLimits struct { + maxSeries int +} + +func (f fakeLimits) MaxQuerySeries(userID string) int { + return f.maxSeries +} + func TestEngine_LogsInstantQuery(t *testing.T) { t.Parallel() for _, test := range []struct { @@ -460,7 +470,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { t.Parallel() - eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params)) + eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits) q := eng.Query(LiteralParams{ qs: test.qs, start: test.ts, @@ -468,7 +478,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { direction: test.direction, limit: test.limit, }) - res, err := q.Exec(context.Background()) + res, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) if err != nil { t.Fatal(err) } @@ -1513,7 +1523,7 @@ func TestEngine_RangeQuery(t *testing.T) { t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { t.Parallel() - eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params)) + eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits) q := eng.Query(LiteralParams{ qs: test.qs, @@ -1524,7 +1534,7 @@ func TestEngine_RangeQuery(t *testing.T) { direction: test.direction, limit: test.limit, }) - res, err := q.Exec(context.Background()) + res, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) if err != nil { t.Fatal(err) } @@ -1549,7 +1559,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it func TestEngine_Stats(t *testing.T) { - eng := NewEngine(EngineOpts{}, &statsQuerier{}) + eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits) q := eng.Query(LiteralParams{ qs: `{foo="bar"}`, @@ -1558,7 +1568,7 @@ func TestEngine_Stats(t *testing.T) { direction: logproto.BACKWARD, limit: 1000, }) - r, err := q.Exec(context.Background()) + r, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) require.NoError(t, err) require.Equal(t, int64(1), r.Statistics.Store.DecompressedBytes) } @@ -1621,19 +1631,53 @@ func TestStepEvaluator_Error(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() tc := tc - eng := NewEngine(EngineOpts{}, tc.querier) + eng := NewEngine(EngineOpts{}, tc.querier, NoLimits) q := eng.Query(LiteralParams{ qs: tc.qs, start: time.Unix(0, 0), end: time.Unix(180, 0), step: 1 * time.Second, }) - _, err := q.Exec(context.Background()) + _, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) require.Equal(t, tc.err, err) }) } } +func TestEngine_MaxSeries(t *testing.T) { + eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), &fakeLimits{maxSeries: 1}) + + for _, test := range []struct { + qs string + direction logproto.Direction + expectLimitErr bool + }{ + {`topk(1,rate(({app=~"foo|bar"})[1m]))`, logproto.FORWARD, true}, + {`{app="foo"}`, logproto.FORWARD, false}, + {`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD, false}, + {`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD, true}, + {`rate({app="foo"}[30s])`, logproto.FORWARD, true}, + {`count_over_time({app="foo|bar"} |~".+bar" [1m])`, logproto.BACKWARD, true}, + {`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD, false}, + } { + q := eng.Query(LiteralParams{ + qs: test.qs, + start: time.Unix(0, 0), + end: time.Unix(100000, 0), + step: 60 * time.Second, + direction: test.direction, + limit: 1000, + }) + _, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) + if test.expectLimitErr { + require.NotNil(t, err) + require.True(t, errors.Is(err, ErrLimit)) + return + } + require.Nil(t, err) + } +} + // go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out func BenchmarkRangeQuery100000(b *testing.B) { benchmarkRangeQuery(int64(100000), b) @@ -1653,7 +1697,7 @@ var result parser.Value func benchmarkRangeQuery(testsize int64, b *testing.B) { b.ReportAllocs() - eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize)) + eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize), NoLimits) start := time.Unix(0, 0) end := time.Unix(testsize, 0) b.ResetTimer() @@ -1692,7 +1736,7 @@ func benchmarkRangeQuery(testsize int64, b *testing.B) { direction: test.direction, limit: 1000, }) - res, err := q.Exec(context.Background()) + res, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) if err != nil { b.Fatal(err) } diff --git a/pkg/logql/error.go b/pkg/logql/error.go index 4e7c6d50b68ca..d1d52220fcd64 100644 --- a/pkg/logql/error.go +++ b/pkg/logql/error.go @@ -1,6 +1,7 @@ package logql import ( + "errors" "fmt" "github.com/prometheus/prometheus/pkg/labels" @@ -8,6 +9,12 @@ import ( "github.com/grafana/loki/pkg/logql/log" ) +var ( + ErrParse = errors.New("failed to parse the log query") + ErrPipeline = errors.New("failed execute pipeline") + ErrLimit = errors.New("limit reached while evaluating the query") +) + // ParseError is what is returned when we failed to parse. type ParseError struct { msg string @@ -21,6 +28,11 @@ func (p ParseError) Error() string { return fmt.Sprintf("parse error at line %d, col %d: %s", p.line, p.col, p.msg) } +// Is allows to use errors.Is(err,ErrParse) on this error. +func (p ParseError) Is(target error) bool { + return target == ErrParse +} + func newParseError(msg string, line, col int) ParseError { return ParseError{ msg: msg, @@ -37,12 +49,6 @@ func newStageError(expr Expr, err error) ParseError { } } -// IsParseError returns true if the err is a ast parsing error. -func IsParseError(err error) bool { - _, ok := err.(ParseError) - return ok -} - type pipelineError struct { metric labels.Labels errorType string @@ -64,8 +70,22 @@ func (e pipelineError) Error() string { e.errorType, e.metric, e.errorType) } -// IsPipelineError tells if the error is generated by a Pipeline. -func IsPipelineError(err error) bool { - _, ok := err.(*pipelineError) - return ok +// Is allows to use errors.Is(err,ErrPipeline) on this error. +func (e pipelineError) Is(target error) bool { + return target == ErrPipeline +} + +type limitError struct { + error +} + +func newSeriesLimitError(limit int) *limitError { + return &limitError{ + error: fmt.Errorf("maximum of series (%d) reached for a single query", limit), + } +} + +// Is allows to use errors.Is(err,ErrLimit) on this error. +func (e limitError) Is(target error) bool { + return target == ErrLimit } diff --git a/pkg/logql/parser.go b/pkg/logql/parser.go index 135cfeea14851..2dd8c29467adc 100644 --- a/pkg/logql/parser.go +++ b/pkg/logql/parser.go @@ -25,7 +25,7 @@ func ParseExpr(input string) (expr Expr, err error) { if r != nil { var ok bool if err, ok = r.(error); ok { - if IsParseError(err) { + if errors.Is(err, ErrParse) { return } err = newParseError(err.Error(), 0, 0) diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index ffbabe2506fbe..a3ea3c255896e 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -1975,7 +1975,7 @@ func TestIsParseError(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := IsParseError(tt.errFn()); got != tt.want { + if got := errors.Is(tt.errFn(), ErrParse); got != tt.want { t.Errorf("IsParseError() = %v, want %v", got, tt.want) } }) diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index c108d64e59fa3..4f060d3d4d517 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -31,16 +31,18 @@ which can then take advantage of our sharded execution model. type ShardedEngine struct { timeout time.Duration downstreamable Downstreamable + limits Limits metrics *ShardingMetrics } // NewShardedEngine constructs a *ShardedEngine -func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics) *ShardedEngine { +func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics, limits Limits) *ShardedEngine { opts.applyDefault() return &ShardedEngine{ timeout: opts.Timeout, downstreamable: downstreamable, metrics: metrics, + limits: limits, } } @@ -54,6 +56,7 @@ func (ng *ShardedEngine) Query(p Params, mapped Expr) Query { parse: func(_ context.Context, _ string) (Expr, error) { return mapped, nil }, + limits: ng.limits, } } diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index d6b364c6a9b71..7c04f4f2113fd 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logproto" ) @@ -59,8 +60,8 @@ func TestMappingEquivalence(t *testing.T) { ) opts := EngineOpts{} - regular := NewEngine(opts, q) - sharded := NewShardedEngine(opts, MockDownstreamer{regular}, nilMetrics) + regular := NewEngine(opts, q, NoLimits) + sharded := NewShardedEngine(opts, MockDownstreamer{regular}, nilMetrics, NoLimits) t.Run(tc.query, func(t *testing.T) { params := NewLiteralParams( @@ -74,7 +75,7 @@ func TestMappingEquivalence(t *testing.T) { nil, ) qry := regular.Query(params) - ctx := context.Background() + ctx := user.InjectOrgID(context.Background(), "fake") mapper, err := NewShardMapper(shards, nilMetrics) require.Nil(t, err) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 5f16fb1176b17..6da917a7fd67e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -72,7 +72,7 @@ func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limi limits: limits, } - querier.engine = logql.NewEngine(cfg.Engine, &querier) + querier.engine = logql.NewEngine(cfg.Engine, &querier, limits) return &querier, nil } diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 05434ce597aa7..4b6b36d0ac5a8 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -2,6 +2,7 @@ package queryrange import ( "fmt" + "sync" "time" "github.com/cortexproject/cortex/pkg/querier/queryrange" @@ -11,6 +12,7 @@ import ( type Limits interface { queryrange.Limits QuerySplitDuration(string) time.Duration + MaxQuerySeries(string) int MaxEntriesLimitPerQuery(string) int } @@ -71,3 +73,27 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st // a cache key can't be reused when an interval changes return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split) } + +type seriesLimiter struct { + hashes map[uint64]struct{} + rw sync.RWMutex + + maxSeries int +} + +func newSeriesLimiter(maxSeries int) *seriesLimiter { + return *seriesLimiter{ + hashes: make(map[uint64]struct{}), + maxSeries: maxSeries, + } +} + +func (sl *seriesLimiter) Add(res queryrange.Response) bool { + if sl.IsLimitReached() { + return true + } +} + +func (sl *seriesLimiter) IsLimitReached() bool { + +} diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index cf1c3234ac260..6201cfbcd521b 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -24,6 +24,7 @@ func NewQueryShardMiddleware( minShardingLookback time.Duration, middlewareMetrics *queryrange.InstrumentMiddlewareMetrics, shardingMetrics *logql.ShardingMetrics, + limits Limits, ) queryrange.Middleware { noshards := !hasShards(confs) @@ -38,7 +39,7 @@ func NewQueryShardMiddleware( } mapperware := queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { - return newASTMapperware(confs, next, logger, shardingMetrics) + return newASTMapperware(confs, next, logger, shardingMetrics, limits) }) return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { @@ -60,13 +61,14 @@ func newASTMapperware( next queryrange.Handler, logger log.Logger, metrics *logql.ShardingMetrics, + limits Limits, ) *astMapperware { return &astMapperware{ confs: confs, logger: log.With(logger, "middleware", "QueryShard.astMapperware"), next: next, - ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics), + ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics, limits), metrics: metrics, } } diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 5c6ac12138b23..05f169724529f 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "math" "sort" "sync" "testing" @@ -146,6 +147,7 @@ func Test_astMapper(t *testing.T) { handler, log.NewNopLogger(), nilShardingMetrics, + fakeLimits{maxSeries: math.MaxInt32}, ) resp, err := mware.Do(context.Background(), defaultReq().WithQuery(`{food="bar"}`)) @@ -175,6 +177,7 @@ func Test_ShardingByPass(t *testing.T) { handler, log.NewNopLogger(), nilShardingMetrics, + fakeLimits{maxSeries: math.MaxInt32}, ) _, err := mware.Do(context.Background(), defaultReq().WithQuery(`1+1`)) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index ae2a850f2505d..5cd4e366e2a97 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -239,6 +239,7 @@ func NewLogFilterTripperware( minShardingLookback, instrumentMetrics, // instrumentation is included in the sharding middleware shardingMetrics, + limits, ), ) } @@ -388,6 +389,7 @@ func NewMetricTripperware( minShardingLookback, instrumentMetrics, // instrumentation is included in the sharding middleware shardingMetrics, + limits, ), ) } diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index e05e524dde8e7..21c6c1d6dd742 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -493,6 +493,7 @@ func TestEntriesLimitWithZeroTripperware(t *testing.T) { type fakeLimits struct { maxQueryParallelism int maxEntriesLimitPerQuery int + maxSeries int splits map[string]time.Duration } @@ -518,6 +519,10 @@ func (f fakeLimits) MaxEntriesLimitPerQuery(string) int { return f.maxEntriesLimitPerQuery } +func (f fakeLimits) MaxQuerySeries(string) int { + return f.maxEntriesLimitPerQuery +} + func (f fakeLimits) MaxCacheFreshness(string) time.Duration { return 1 * time.Minute } diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index f6eb9d0723bf4..7f61d172722d8 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -143,6 +143,12 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) { resp, err := h.next.Do(ctx, data.req) + // check that we're not going over the series budget. + // if err == nil { + // if promRes, ok := resp.(*LokiPromResponse); ok { + + // } + // } select { case <-ctx.Done(): sp.Finish() diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index 1d8487c92eef0..5e2db28cc0d01 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -7,6 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logql" ) @@ -30,7 +31,9 @@ func WriteError(err error, w http.ResponseWriter) { http.Error(w, ErrDeadlineExceeded, http.StatusGatewayTimeout) case errors.As(err, &queryErr): http.Error(w, err.Error(), http.StatusBadRequest) - case logql.IsParseError(err) || logql.IsPipelineError(err): + case errors.Is(err, logql.ErrLimit) || errors.Is(err, logql.ErrParse) || errors.Is(err, logql.ErrPipeline): + http.Error(w, err.Error(), http.StatusBadRequest) + case errors.Is(err, user.ErrNoOrgID): http.Error(w, err.Error(), http.StatusBadRequest) default: if grpcErr, ok := httpgrpc.HTTPResponseFromError(err); ok { diff --git a/pkg/util/server/error_test.go b/pkg/util/server/error_test.go index b557ffa309e53..5ea16c7fa0d41 100644 --- a/pkg/util/server/error_test.go +++ b/pkg/util/server/error_test.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logql" ) @@ -25,6 +26,7 @@ func Test_writeError(t *testing.T) { expectedStatus int }{ {"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest}, + {"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest}, {"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"parse error", logql.ParseError{}, "parse error : ", http.StatusBadRequest}, {"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), "foo", http.StatusBadRequest}, diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 28a4676f3d339..d381aaaaead04 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -39,6 +39,7 @@ type Limits struct { // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_chunks_per_query"` + MaxQuerySeries int `yaml:"max_query_series"` MaxQueryLength time.Duration `yaml:"max_query_length"` MaxQueryParallelism int `yaml:"max_query_parallelism"` CardinalityLimit int `yaml:"cardinality_limit"` @@ -75,6 +76,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.") + f.IntVar(&l.MaxQuerySeries, "querier.max-query-series", 500, "Limit the maximum of unique series returned by a metric query. When the limit is reached an error is returned.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.") f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query") @@ -204,6 +206,11 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration { return o.getOverridesForUser(userID).MaxQueryLength } +// MaxQueryLength returns the limit of the series of metric queries. +func (o *Overrides) MaxQuerySeries(userID string) int { + return o.getOverridesForUser(userID).MaxQuerySeries +} + // MaxQueryParallelism returns the limit to the number of sub-queries the // frontend will process in parallel. func (o *Overrides) MaxQueryParallelism(userID string) int { From 67d6ac31e0bd08b43bfae41763b67a58112b9e0a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 6 Nov 2020 20:55:58 +0100 Subject: [PATCH 02/10] First draft of the series limiter. Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/limits.go | 48 ++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 4b6b36d0ac5a8..c181008936afd 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -2,10 +2,17 @@ package queryrange import ( "fmt" + "net/http" "sync" "time" + "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/weaveworks/common/httpgrpc" +) + +const ( + limitErrTmpl = "maximum of series (%d) reached for a single query" ) // Limits extends the cortex limits interface with support for per tenant splitby parameters @@ -77,23 +84,52 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st type seriesLimiter struct { hashes map[uint64]struct{} rw sync.RWMutex + buf []byte maxSeries int } func newSeriesLimiter(maxSeries int) *seriesLimiter { - return *seriesLimiter{ + return &seriesLimiter{ hashes: make(map[uint64]struct{}), maxSeries: maxSeries, + buf: make([]byte, 0, 1024), } } -func (sl *seriesLimiter) Add(res queryrange.Response) bool { - if sl.IsLimitReached() { - return true +func (sl *seriesLimiter) Add(res queryrange.Response, resErr error) (queryrange.Response, error) { + if resErr != nil { + return res, resErr + } + promResponse, ok := res.(*LokiPromResponse) + if !ok { + return res, nil + } + if err := sl.IsLimitReached(); err != nil { + return nil, err + } + if promResponse.Response == nil { + return res, nil } + sl.rw.Lock() + var hash uint64 + for _, s := range promResponse.Response.Data.Result { + lbs := client.FromLabelAdaptersToLabels(s.Labels) + hash, sl.buf = lbs.HashWithoutLabels(sl.buf, []string(nil)...) + sl.hashes[hash] = struct{}{} + } + sl.rw.Unlock() + if err := sl.IsLimitReached(); err != nil { + return nil, err + } + return res, nil } -func (sl *seriesLimiter) IsLimitReached() bool { - +func (sl *seriesLimiter) IsLimitReached() error { + sl.rw.RLock() + defer sl.rw.RUnlock() + if len(sl.hashes) > sl.maxSeries { + return httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries) + } + return nil } From 072e88359c9e2af178528c71265f5ed79347f029 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 9 Nov 2020 10:49:20 +0100 Subject: [PATCH 03/10] Add limiter to query-frontend. Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/limits.go | 41 +++++--- pkg/querier/queryrange/limits_test.go | 100 ++++++++++++++++++++ pkg/querier/queryrange/roundtrip_test.go | 5 +- pkg/querier/queryrange/split_by_interval.go | 17 ++-- 4 files changed, 136 insertions(+), 27 deletions(-) diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index c181008936afd..1a20da18c8002 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -1,6 +1,7 @@ package queryrange import ( + "context" "fmt" "net/http" "sync" @@ -87,27 +88,40 @@ type seriesLimiter struct { buf []byte maxSeries int + next queryrange.Handler } -func newSeriesLimiter(maxSeries int) *seriesLimiter { +type seriesLimiterMiddleware int + +// newSeriesLimiter creates a new series limiter middleware for use for a single request. +func newSeriesLimiter(maxSeries int) queryrange.Middleware { + return seriesLimiterMiddleware(maxSeries) +} + +// Wrap wraps a global handler and returns a per request limited handler. +// The handler returned is thread safe. +func (slm seriesLimiterMiddleware) Wrap(next queryrange.Handler) queryrange.Handler { return &seriesLimiter{ hashes: make(map[uint64]struct{}), - maxSeries: maxSeries, + maxSeries: int(slm), buf: make([]byte, 0, 1024), + next: next, } } -func (sl *seriesLimiter) Add(res queryrange.Response, resErr error) (queryrange.Response, error) { - if resErr != nil { - return res, resErr +func (sl *seriesLimiter) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { + // no need to fire a request if the limit is already reached. + if sl.isLimitReached() { + return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries) + } + res, err := sl.next.Do(ctx, req) + if err != nil { + return res, err } promResponse, ok := res.(*LokiPromResponse) if !ok { return res, nil } - if err := sl.IsLimitReached(); err != nil { - return nil, err - } if promResponse.Response == nil { return res, nil } @@ -119,17 +133,14 @@ func (sl *seriesLimiter) Add(res queryrange.Response, resErr error) (queryrange. sl.hashes[hash] = struct{}{} } sl.rw.Unlock() - if err := sl.IsLimitReached(); err != nil { - return nil, err + if sl.isLimitReached() { + return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries) } return res, nil } -func (sl *seriesLimiter) IsLimitReached() error { +func (sl *seriesLimiter) isLimitReached() bool { sl.rw.RLock() defer sl.rw.RUnlock() - if len(sl.hashes) > sl.maxSeries { - return httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries) - } - return nil + return len(sl.hashes) > sl.maxSeries } diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 8e5fc04043398..9ffc33aca90da 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -1,12 +1,23 @@ package queryrange import ( + "context" "fmt" + "net/http" + "sync" "testing" "time" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/util" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/marshal" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" ) func TestLimits(t *testing.T) { @@ -44,3 +55,92 @@ func TestLimits(t *testing.T) { cacheKeyLimits{wrapped}.GenerateCacheKey("a", r), ) } + +func Test_seriesLimiter(t *testing.T) { + cfg := testConfig + cfg.SplitQueriesByInterval = time.Hour + cfg.CacheResults = false + // split in 6 with 4 in // max. + tpw, stopper, err := NewTripperware(cfg, util.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 4}, chunk.SchemaConfig{}, 0, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + + lreq := &LokiRequest{ + Query: `rate({app="foo"} |= "foo"[1m])`, + Limit: 1000, + Step: 30000, //30sec + StartTs: testTime.Add(-6 * time.Hour), + EndTs: testTime, + Direction: logproto.FORWARD, + Path: "/query_range", + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := lokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + count, h := promqlResult(matrix) + rt.setHandler(h) + + _, err = tpw(rt).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 6, *count) + + // 2 series should not be allowed. + c := new(int) + m := &sync.Mutex{} + h = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + m.Lock() + defer m.Unlock() + defer func() { + *c++ + }() + // first time returns a single series + if *c == 0 { + if err := marshal.WriteQueryResponseJSON(logql.Result{Data: matrix}, rw); err != nil { + panic(err) + } + return + } + // second time returns a different series. + if err := marshal.WriteQueryResponseJSON(logql.Result{ + Data: promql.Matrix{ + { + Points: []promql.Point{ + { + T: toMs(testTime.Add(-4 * time.Hour)), + V: 0.013333333333333334, + }, + }, + Metric: []labels.Label{ + { + Name: "filename", + Value: `/var/hostlog/apport.log`, + }, + { + Name: "job", + Value: "anotherjob", + }, + }, + }, + }, + }, rw); err != nil { + panic(err) + } + }) + rt.setHandler(h) + + _, err = tpw(rt).RoundTrip(req) + require.Error(t, err) + require.LessOrEqual(t, *c, 4) +} diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 21c6c1d6dd742..d308964cc0fe0 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io/ioutil" + "math" "net/http" "net/http/httptest" "net/url" @@ -92,7 +93,7 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() } @@ -520,7 +521,7 @@ func (f fakeLimits) MaxEntriesLimitPerQuery(string) int { } func (f fakeLimits) MaxQuerySeries(string) int { - return f.maxEntriesLimitPerQuery + return f.maxSeries } func (f fakeLimits) MaxCacheFreshness(string) time.Duration { diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 7f61d172722d8..975ea79d5578d 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -83,6 +83,7 @@ func (h *splitByInterval) Process( parallelism int, threshold int64, input []*lokiResult, + userID string, ) ([]queryrange.Response, error) { var responses []queryrange.Response ctx, cancel := context.WithCancel(ctx) @@ -102,8 +103,10 @@ func (h *splitByInterval) Process( p = len(input) } + // per request wrapped handler for limiting the amount of series. + next := newSeriesLimiter(h.limits.MaxQuerySeries(userID)).Wrap(h.next) for i := 0; i < p; i++ { - go h.loop(ctx, ch) + go h.loop(ctx, ch, next) } for _, x := range input { @@ -134,21 +137,15 @@ func (h *splitByInterval) Process( return responses, nil } -func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) { +func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next queryrange.Handler) { for data := range ch { sp, ctx := opentracing.StartSpanFromContext(ctx, "interval") data.req.LogToSpan(sp) - resp, err := h.next.Do(ctx, data.req) + resp, err := next.Do(ctx, data.req) - // check that we're not going over the series budget. - // if err == nil { - // if promRes, ok := resp.(*LokiPromResponse); ok { - - // } - // } select { case <-ctx.Done(): sp.Finish() @@ -209,7 +206,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra }) } - resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input) + resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input, userid) if err != nil { return nil, err } From 1b013b91801020e3a558f9fc6e2d84a7725fd3b6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 9 Nov 2020 10:52:32 +0100 Subject: [PATCH 04/10] Add documentation. Signed-off-by: Cyril Tovena --- docs/sources/configuration/_index.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index ef7bc4b75818b..4d874ac7cb772 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1648,6 +1648,11 @@ logs in Loki. # CLI flag: -querier.max-query-parallelism [max_query_parallelism: | default = 14] +# Limit the maximum of unique series returned by a metric query. +# When the limit is reached an error is returned. +# CLI flag: -querier.max-query-series +[max_query_series: | default = 500] + # Cardinality limit for index queries. # CLI flag: -store.cardinality-limit [cardinality_limit: | default = 100000] @@ -1877,4 +1882,4 @@ multi_kv_config: mirror-enabled: false primary: consul ``` -### Generic placeholders \ No newline at end of file +### Generic placeholders From 94ee1f5df1c0eaf234b8320307250b448c10d060 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 9 Nov 2020 11:01:00 +0100 Subject: [PATCH 05/10] Fix build Signed-off-by: Cyril Tovena --- pkg/loki/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0de2559c146ad..43e5df50b2e57 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -460,7 +460,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) { return nil, err } - engine := logql.NewEngine(t.cfg.Querier.Engine, q) + engine := logql.NewEngine(t.cfg.Querier.Engine, q, t.overrides) t.ruler, err = ruler.NewRuler( t.cfg.Ruler, From ff1ba6ff84140bcfa25e136a80abe405fb89328f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 9 Nov 2020 11:02:25 +0100 Subject: [PATCH 06/10] fmted Signed-off-by: Cyril Tovena --- pkg/loki/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 43e5df50b2e57..5323062d746fe 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -460,7 +460,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) { return nil, err } - engine := logql.NewEngine(t.cfg.Querier.Engine, q, t.overrides) + engine := logql.NewEngine(t.cfg.Querier.Engine, q, t.overrides) t.ruler, err = ruler.NewRuler( t.cfg.Ruler, From 0c7df78fe1449876efe647801118313485c65fec Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 9 Nov 2020 11:23:30 +0100 Subject: [PATCH 07/10] lint and build fix Signed-off-by: Cyril Tovena --- pkg/logcli/query/query.go | 2 +- pkg/logcli/query/query_test.go | 2 +- pkg/logql/engine.go | 5 ----- pkg/logql/engine_test.go | 9 --------- pkg/logql/limits.go | 22 ++++++++++++++++++++++ pkg/querier/queryrange/limits_test.go | 7 ++++--- pkg/querier/queryrange/querysharding.go | 4 ++-- 7 files changed, 30 insertions(+), 21 deletions(-) create mode 100644 pkg/logql/limits.go diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index c1ce0a959b682..e38348182f236 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -201,7 +201,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string return err } - eng := logql.NewEngine(conf.Querier.Engine, querier) + eng := logql.NewEngine(conf.Querier.Engine, querier, limits) var query logql.Query if q.isInstant() { diff --git a/pkg/logcli/query/query_test.go b/pkg/logcli/query/query_test.go index 0cad99f55e5ae..347d82e90a48e 100644 --- a/pkg/logcli/query/query_test.go +++ b/pkg/logcli/query/query_test.go @@ -502,7 +502,7 @@ type testQueryClient struct { func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient { q := logql.NewMockQuerier(0, testStreams) - e := logql.NewEngine(logql.EngineOpts{}, q) + e := logql.NewEngine(logql.EngineOpts{}, q, logql.NoLimits) return &testQueryClient{ engine: e, queryRangeCalls: 0, diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 35b753bd27c8f..b081929e607be 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -90,11 +90,6 @@ type Engine struct { limits Limits } -// Limits allow the engine to fetch limits for a given users. -type Limits interface { - MaxQuerySeries(userID string) int -} - // NewEngine creates a new LogQL Engine. func NewEngine(opts EngineOpts, q Querier, l Limits) *Engine { opts.applyDefault() diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 3e67e34f7a6e0..370ae5d37466d 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -27,17 +27,8 @@ var ( testSize = int64(300) ErrMock = errors.New("mock error") ErrMockMultiple = errors.New("Multiple errors: [mock error mock error]") - NoLimits = &fakeLimits{maxSeries: math.MaxInt32} ) -type fakeLimits struct { - maxSeries int -} - -func (f fakeLimits) MaxQuerySeries(userID string) int { - return f.maxSeries -} - func TestEngine_LogsInstantQuery(t *testing.T) { t.Parallel() for _, test := range []struct { diff --git a/pkg/logql/limits.go b/pkg/logql/limits.go new file mode 100644 index 0000000000000..159911941b3ae --- /dev/null +++ b/pkg/logql/limits.go @@ -0,0 +1,22 @@ +package logql + +import ( + "math" +) + +var ( + NoLimits = &fakeLimits{maxSeries: math.MaxInt32} +) + +// Limits allow the engine to fetch limits for a given users. +type Limits interface { + MaxQuerySeries(userID string) int +} + +type fakeLimits struct { + maxSeries int +} + +func (f fakeLimits) MaxQuerySeries(userID string) int { + return f.maxSeries +} diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 9ffc33aca90da..73b24bb4b757a 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -11,13 +11,14 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/cortexproject/cortex/pkg/util" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/logql/marshal" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/marshal" ) func TestLimits(t *testing.T) { diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 6201cfbcd521b..3adfd5f5583ac 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -24,7 +24,7 @@ func NewQueryShardMiddleware( minShardingLookback time.Duration, middlewareMetrics *queryrange.InstrumentMiddlewareMetrics, shardingMetrics *logql.ShardingMetrics, - limits Limits, + limits logql.Limits, ) queryrange.Middleware { noshards := !hasShards(confs) @@ -61,7 +61,7 @@ func newASTMapperware( next queryrange.Handler, logger log.Logger, metrics *logql.ShardingMetrics, - limits Limits, + limits logql.Limits, ) *astMapperware { return &astMapperware{ From c5dadbe7c0bde3daee6b1030def963cea64feca2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 18 Nov 2020 04:49:36 -0500 Subject: [PATCH 08/10] Update docs/sources/configuration/_index.md Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> --- docs/sources/configuration/_index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 4d874ac7cb772..6042440b952bb 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1648,7 +1648,7 @@ logs in Loki. # CLI flag: -querier.max-query-parallelism [max_query_parallelism: | default = 14] -# Limit the maximum of unique series returned by a metric query. +# Limit the maximum of unique series that is returned by a metric query. # When the limit is reached an error is returned. # CLI flag: -querier.max-query-series [max_query_series: | default = 500] From c578bedb7b180c301b7f7a6a63c319705172f50e Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 18 Nov 2020 11:07:50 +0100 Subject: [PATCH 09/10] Review feedback. Signed-off-by: Cyril Tovena --- pkg/logql/error.go | 2 ++ pkg/querier/queryrange/limits.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/logql/error.go b/pkg/logql/error.go index d1d52220fcd64..c7603be2110b6 100644 --- a/pkg/logql/error.go +++ b/pkg/logql/error.go @@ -9,6 +9,8 @@ import ( "github.com/grafana/loki/pkg/logql/log" ) +// Those errors are useful for comparing error returned by the engine. +// e.g. errors.Is(err,logql.ErrParse) let you know if this is a ast parsing error. var ( ErrParse = errors.New("failed to parse the log query") ErrPipeline = errors.New("failed execute pipeline") diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 1a20da18c8002..c3483688569ad 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -85,7 +85,7 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st type seriesLimiter struct { hashes map[uint64]struct{} rw sync.RWMutex - buf []byte + buf []byte // buf used for hashing to avoid allocations. maxSeries int next queryrange.Handler From 935c83c6820603585bba5e48af2482e69735bd36 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 18 Nov 2020 11:09:35 +0100 Subject: [PATCH 10/10] Reduce // for to fix flaky test. Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/limits_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 73b24bb4b757a..cbdaaa562d436 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -62,7 +62,7 @@ func Test_seriesLimiter(t *testing.T) { cfg.SplitQueriesByInterval = time.Hour cfg.CacheResults = false // split in 6 with 4 in // max. - tpw, stopper, err := NewTripperware(cfg, util.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 4}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(cfg, util.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() }