diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 26438a48da165..598307bb5c4ea 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1648,6 +1648,11 @@ logs in Loki. # CLI flag: -querier.max-query-parallelism [max_query_parallelism: | default = 14] +# Limit the maximum of unique series that is returned by a metric query. +# When the limit is reached an error is returned. +# CLI flag: -querier.max-query-series +[max_query_series: | default = 500] + # Cardinality limit for index queries. # CLI flag: -store.cardinality-limit [cardinality_limit: | default = 100000] @@ -1881,4 +1886,4 @@ multi_kv_config: mirror-enabled: false primary: consul ``` -### Generic placeholders \ No newline at end of file +### Generic placeholders diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index c1ce0a959b682..e38348182f236 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -201,7 +201,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string return err } - eng := logql.NewEngine(conf.Querier.Engine, querier) + eng := logql.NewEngine(conf.Querier.Engine, querier, limits) var query logql.Query if q.isInstant() { diff --git a/pkg/logcli/query/query_test.go b/pkg/logcli/query/query_test.go index 0cad99f55e5ae..347d82e90a48e 100644 --- a/pkg/logcli/query/query_test.go +++ b/pkg/logcli/query/query_test.go @@ -502,7 +502,7 @@ type testQueryClient struct { func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient { q := logql.NewMockQuerier(0, testStreams) - e := logql.NewEngine(logql.EngineOpts{}, q) + e := logql.NewEngine(logql.EngineOpts{}, q, logql.NoLimits) return &testQueryClient{ engine: e, queryRangeCalls: 0, diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 6885a64ae8d19..584a02ebd5ad1 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" promql_parser "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" @@ -85,14 +86,16 @@ func (opts *EngineOpts) applyDefault() { type Engine struct { timeout time.Duration evaluator Evaluator + limits Limits } // NewEngine creates a new LogQL Engine. -func NewEngine(opts EngineOpts, q Querier) *Engine { +func NewEngine(opts EngineOpts, q Querier, l Limits) *Engine { opts.applyDefault() return &Engine{ timeout: opts.Timeout, evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod), + limits: l, } } @@ -106,6 +109,7 @@ func (ng *Engine) Query(params Params) Query { return ParseExpr(query) }, record: true, + limits: ng.limits, } } @@ -119,6 +123,7 @@ type query struct { timeout time.Duration params Params parse func(context.Context, string) (Expr, error) + limits Limits evaluator Evaluator record bool } @@ -145,7 +150,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) { status := "200" if err != nil { status = "500" - if IsParseError(err) || IsPipelineError(err) { + if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) { status = "400" } } @@ -194,6 +199,11 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser. return q.evalLiteral(ctx, lit) } + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params) if err != nil { return nil, err @@ -201,11 +211,18 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser. defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close) seriesIndex := map[uint64]*promql.Series{} + maxSeries := q.limits.MaxQuerySeries(userID) next, ts, vec := stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() } + + // fail fast for the first step or instant query + if len(vec) > maxSeries { + return nil, newSeriesLimitError(maxSeries) + } + if GetRangeType(q.params) == InstantType { sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 }) return vec, nil @@ -237,6 +254,10 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser. V: p.V, }) } + // as we slowly build the full query for each steps, make sure we don't go over the limit of unique series. + if len(seriesIndex) > maxSeries { + return nil, newSeriesLimitError(maxSeries) + } next, ts, vec = stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index a7de24b7abf7c..343264b3eb7c4 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -16,6 +16,7 @@ import ( promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -460,7 +461,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { t.Parallel() - eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params)) + eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits) q := eng.Query(LiteralParams{ qs: test.qs, start: test.ts, @@ -468,7 +469,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { direction: test.direction, limit: test.limit, }) - res, err := q.Exec(context.Background()) + res, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) if err != nil { t.Fatal(err) } @@ -1513,7 +1514,7 @@ func TestEngine_RangeQuery(t *testing.T) { t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { t.Parallel() - eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params)) + eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits) q := eng.Query(LiteralParams{ qs: test.qs, @@ -1524,7 +1525,7 @@ func TestEngine_RangeQuery(t *testing.T) { direction: test.direction, limit: test.limit, }) - res, err := q.Exec(context.Background()) + res, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) if err != nil { t.Fatal(err) } @@ -1549,7 +1550,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it func TestEngine_Stats(t *testing.T) { - eng := NewEngine(EngineOpts{}, &statsQuerier{}) + eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits) q := eng.Query(LiteralParams{ qs: `{foo="bar"}`, @@ -1558,7 +1559,7 @@ func TestEngine_Stats(t *testing.T) { direction: logproto.BACKWARD, limit: 1000, }) - r, err := q.Exec(context.Background()) + r, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) require.NoError(t, err) require.Equal(t, int64(1), r.Statistics.Store.DecompressedBytes) } @@ -1621,19 +1622,53 @@ func TestStepEvaluator_Error(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() tc := tc - eng := NewEngine(EngineOpts{}, tc.querier) + eng := NewEngine(EngineOpts{}, tc.querier, NoLimits) q := eng.Query(LiteralParams{ qs: tc.qs, start: time.Unix(0, 0), end: time.Unix(180, 0), step: 1 * time.Second, }) - _, err := q.Exec(context.Background()) + _, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) require.Equal(t, tc.err, err) }) } } +func TestEngine_MaxSeries(t *testing.T) { + eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), &fakeLimits{maxSeries: 1}) + + for _, test := range []struct { + qs string + direction logproto.Direction + expectLimitErr bool + }{ + {`topk(1,rate(({app=~"foo|bar"})[1m]))`, logproto.FORWARD, true}, + {`{app="foo"}`, logproto.FORWARD, false}, + {`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD, false}, + {`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD, true}, + {`rate({app="foo"}[30s])`, logproto.FORWARD, true}, + {`count_over_time({app="foo|bar"} |~".+bar" [1m])`, logproto.BACKWARD, true}, + {`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD, false}, + } { + q := eng.Query(LiteralParams{ + qs: test.qs, + start: time.Unix(0, 0), + end: time.Unix(100000, 0), + step: 60 * time.Second, + direction: test.direction, + limit: 1000, + }) + _, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) + if test.expectLimitErr { + require.NotNil(t, err) + require.True(t, errors.Is(err, ErrLimit)) + return + } + require.Nil(t, err) + } +} + // go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out func BenchmarkRangeQuery100000(b *testing.B) { benchmarkRangeQuery(int64(100000), b) @@ -1653,7 +1688,7 @@ var result promql_parser.Value func benchmarkRangeQuery(testsize int64, b *testing.B) { b.ReportAllocs() - eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize)) + eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize), NoLimits) start := time.Unix(0, 0) end := time.Unix(testsize, 0) b.ResetTimer() @@ -1692,7 +1727,7 @@ func benchmarkRangeQuery(testsize int64, b *testing.B) { direction: test.direction, limit: 1000, }) - res, err := q.Exec(context.Background()) + res, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) if err != nil { b.Fatal(err) } diff --git a/pkg/logql/error.go b/pkg/logql/error.go index 4e7c6d50b68ca..c7603be2110b6 100644 --- a/pkg/logql/error.go +++ b/pkg/logql/error.go @@ -1,6 +1,7 @@ package logql import ( + "errors" "fmt" "github.com/prometheus/prometheus/pkg/labels" @@ -8,6 +9,14 @@ import ( "github.com/grafana/loki/pkg/logql/log" ) +// Those errors are useful for comparing error returned by the engine. +// e.g. errors.Is(err,logql.ErrParse) let you know if this is a ast parsing error. +var ( + ErrParse = errors.New("failed to parse the log query") + ErrPipeline = errors.New("failed execute pipeline") + ErrLimit = errors.New("limit reached while evaluating the query") +) + // ParseError is what is returned when we failed to parse. type ParseError struct { msg string @@ -21,6 +30,11 @@ func (p ParseError) Error() string { return fmt.Sprintf("parse error at line %d, col %d: %s", p.line, p.col, p.msg) } +// Is allows to use errors.Is(err,ErrParse) on this error. +func (p ParseError) Is(target error) bool { + return target == ErrParse +} + func newParseError(msg string, line, col int) ParseError { return ParseError{ msg: msg, @@ -37,12 +51,6 @@ func newStageError(expr Expr, err error) ParseError { } } -// IsParseError returns true if the err is a ast parsing error. -func IsParseError(err error) bool { - _, ok := err.(ParseError) - return ok -} - type pipelineError struct { metric labels.Labels errorType string @@ -64,8 +72,22 @@ func (e pipelineError) Error() string { e.errorType, e.metric, e.errorType) } -// IsPipelineError tells if the error is generated by a Pipeline. -func IsPipelineError(err error) bool { - _, ok := err.(*pipelineError) - return ok +// Is allows to use errors.Is(err,ErrPipeline) on this error. +func (e pipelineError) Is(target error) bool { + return target == ErrPipeline +} + +type limitError struct { + error +} + +func newSeriesLimitError(limit int) *limitError { + return &limitError{ + error: fmt.Errorf("maximum of series (%d) reached for a single query", limit), + } +} + +// Is allows to use errors.Is(err,ErrLimit) on this error. +func (e limitError) Is(target error) bool { + return target == ErrLimit } diff --git a/pkg/logql/limits.go b/pkg/logql/limits.go new file mode 100644 index 0000000000000..159911941b3ae --- /dev/null +++ b/pkg/logql/limits.go @@ -0,0 +1,22 @@ +package logql + +import ( + "math" +) + +var ( + NoLimits = &fakeLimits{maxSeries: math.MaxInt32} +) + +// Limits allow the engine to fetch limits for a given users. +type Limits interface { + MaxQuerySeries(userID string) int +} + +type fakeLimits struct { + maxSeries int +} + +func (f fakeLimits) MaxQuerySeries(userID string) int { + return f.maxSeries +} diff --git a/pkg/logql/parser.go b/pkg/logql/parser.go index 9556395a9f098..e306f24b2c139 100644 --- a/pkg/logql/parser.go +++ b/pkg/logql/parser.go @@ -57,7 +57,7 @@ func ParseExpr(input string) (expr Expr, err error) { if r := recover(); r != nil { var ok bool if err, ok = r.(error); ok { - if IsParseError(err) { + if errors.Is(err, ErrParse) { return } err = newParseError(err.Error(), 0, 0) diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index ee3a02ea8bb9e..2a819871a4886 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -1975,7 +1975,7 @@ func TestIsParseError(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := IsParseError(tt.errFn()); got != tt.want { + if got := errors.Is(tt.errFn(), ErrParse); got != tt.want { t.Errorf("IsParseError() = %v, want %v", got, tt.want) } }) diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index c108d64e59fa3..4f060d3d4d517 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -31,16 +31,18 @@ which can then take advantage of our sharded execution model. type ShardedEngine struct { timeout time.Duration downstreamable Downstreamable + limits Limits metrics *ShardingMetrics } // NewShardedEngine constructs a *ShardedEngine -func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics) *ShardedEngine { +func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics, limits Limits) *ShardedEngine { opts.applyDefault() return &ShardedEngine{ timeout: opts.Timeout, downstreamable: downstreamable, metrics: metrics, + limits: limits, } } @@ -54,6 +56,7 @@ func (ng *ShardedEngine) Query(p Params, mapped Expr) Query { parse: func(_ context.Context, _ string) (Expr, error) { return mapped, nil }, + limits: ng.limits, } } diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index d6b364c6a9b71..7c04f4f2113fd 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logproto" ) @@ -59,8 +60,8 @@ func TestMappingEquivalence(t *testing.T) { ) opts := EngineOpts{} - regular := NewEngine(opts, q) - sharded := NewShardedEngine(opts, MockDownstreamer{regular}, nilMetrics) + regular := NewEngine(opts, q, NoLimits) + sharded := NewShardedEngine(opts, MockDownstreamer{regular}, nilMetrics, NoLimits) t.Run(tc.query, func(t *testing.T) { params := NewLiteralParams( @@ -74,7 +75,7 @@ func TestMappingEquivalence(t *testing.T) { nil, ) qry := regular.Query(params) - ctx := context.Background() + ctx := user.InjectOrgID(context.Background(), "fake") mapper, err := NewShardMapper(shards, nilMetrics) require.Nil(t, err) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index c348d2864dbae..b3b363927be5e 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -464,7 +464,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) { return nil, err } - engine := logql.NewEngine(t.cfg.Querier.Engine, q) + engine := logql.NewEngine(t.cfg.Querier.Engine, q, t.overrides) t.ruler, err = ruler.NewRuler( t.cfg.Ruler, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 5f16fb1176b17..6da917a7fd67e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -72,7 +72,7 @@ func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limi limits: limits, } - querier.engine = logql.NewEngine(cfg.Engine, &querier) + querier.engine = logql.NewEngine(cfg.Engine, &querier, limits) return &querier, nil } diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 05434ce597aa7..c3483688569ad 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -1,16 +1,26 @@ package queryrange import ( + "context" "fmt" + "net/http" + "sync" "time" + "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/weaveworks/common/httpgrpc" +) + +const ( + limitErrTmpl = "maximum of series (%d) reached for a single query" ) // Limits extends the cortex limits interface with support for per tenant splitby parameters type Limits interface { queryrange.Limits QuerySplitDuration(string) time.Duration + MaxQuerySeries(string) int MaxEntriesLimitPerQuery(string) int } @@ -71,3 +81,66 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st // a cache key can't be reused when an interval changes return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split) } + +type seriesLimiter struct { + hashes map[uint64]struct{} + rw sync.RWMutex + buf []byte // buf used for hashing to avoid allocations. + + maxSeries int + next queryrange.Handler +} + +type seriesLimiterMiddleware int + +// newSeriesLimiter creates a new series limiter middleware for use for a single request. +func newSeriesLimiter(maxSeries int) queryrange.Middleware { + return seriesLimiterMiddleware(maxSeries) +} + +// Wrap wraps a global handler and returns a per request limited handler. +// The handler returned is thread safe. +func (slm seriesLimiterMiddleware) Wrap(next queryrange.Handler) queryrange.Handler { + return &seriesLimiter{ + hashes: make(map[uint64]struct{}), + maxSeries: int(slm), + buf: make([]byte, 0, 1024), + next: next, + } +} + +func (sl *seriesLimiter) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { + // no need to fire a request if the limit is already reached. + if sl.isLimitReached() { + return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries) + } + res, err := sl.next.Do(ctx, req) + if err != nil { + return res, err + } + promResponse, ok := res.(*LokiPromResponse) + if !ok { + return res, nil + } + if promResponse.Response == nil { + return res, nil + } + sl.rw.Lock() + var hash uint64 + for _, s := range promResponse.Response.Data.Result { + lbs := client.FromLabelAdaptersToLabels(s.Labels) + hash, sl.buf = lbs.HashWithoutLabels(sl.buf, []string(nil)...) + sl.hashes[hash] = struct{}{} + } + sl.rw.Unlock() + if sl.isLimitReached() { + return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries) + } + return res, nil +} + +func (sl *seriesLimiter) isLimitReached() bool { + sl.rw.RLock() + defer sl.rw.RUnlock() + return len(sl.hashes) > sl.maxSeries +} diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 8e5fc04043398..cbdaaa562d436 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -1,12 +1,24 @@ package queryrange import ( + "context" "fmt" + "net/http" + "sync" "testing" "time" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/marshal" ) func TestLimits(t *testing.T) { @@ -44,3 +56,92 @@ func TestLimits(t *testing.T) { cacheKeyLimits{wrapped}.GenerateCacheKey("a", r), ) } + +func Test_seriesLimiter(t *testing.T) { + cfg := testConfig + cfg.SplitQueriesByInterval = time.Hour + cfg.CacheResults = false + // split in 6 with 4 in // max. + tpw, stopper, err := NewTripperware(cfg, util.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, 0, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + + lreq := &LokiRequest{ + Query: `rate({app="foo"} |= "foo"[1m])`, + Limit: 1000, + Step: 30000, //30sec + StartTs: testTime.Add(-6 * time.Hour), + EndTs: testTime, + Direction: logproto.FORWARD, + Path: "/query_range", + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := lokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + count, h := promqlResult(matrix) + rt.setHandler(h) + + _, err = tpw(rt).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 6, *count) + + // 2 series should not be allowed. + c := new(int) + m := &sync.Mutex{} + h = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + m.Lock() + defer m.Unlock() + defer func() { + *c++ + }() + // first time returns a single series + if *c == 0 { + if err := marshal.WriteQueryResponseJSON(logql.Result{Data: matrix}, rw); err != nil { + panic(err) + } + return + } + // second time returns a different series. + if err := marshal.WriteQueryResponseJSON(logql.Result{ + Data: promql.Matrix{ + { + Points: []promql.Point{ + { + T: toMs(testTime.Add(-4 * time.Hour)), + V: 0.013333333333333334, + }, + }, + Metric: []labels.Label{ + { + Name: "filename", + Value: `/var/hostlog/apport.log`, + }, + { + Name: "job", + Value: "anotherjob", + }, + }, + }, + }, + }, rw); err != nil { + panic(err) + } + }) + rt.setHandler(h) + + _, err = tpw(rt).RoundTrip(req) + require.Error(t, err) + require.LessOrEqual(t, *c, 4) +} diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index cf1c3234ac260..3adfd5f5583ac 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -24,6 +24,7 @@ func NewQueryShardMiddleware( minShardingLookback time.Duration, middlewareMetrics *queryrange.InstrumentMiddlewareMetrics, shardingMetrics *logql.ShardingMetrics, + limits logql.Limits, ) queryrange.Middleware { noshards := !hasShards(confs) @@ -38,7 +39,7 @@ func NewQueryShardMiddleware( } mapperware := queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { - return newASTMapperware(confs, next, logger, shardingMetrics) + return newASTMapperware(confs, next, logger, shardingMetrics, limits) }) return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { @@ -60,13 +61,14 @@ func newASTMapperware( next queryrange.Handler, logger log.Logger, metrics *logql.ShardingMetrics, + limits logql.Limits, ) *astMapperware { return &astMapperware{ confs: confs, logger: log.With(logger, "middleware", "QueryShard.astMapperware"), next: next, - ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics), + ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics, limits), metrics: metrics, } } diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 5c6ac12138b23..05f169724529f 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "math" "sort" "sync" "testing" @@ -146,6 +147,7 @@ func Test_astMapper(t *testing.T) { handler, log.NewNopLogger(), nilShardingMetrics, + fakeLimits{maxSeries: math.MaxInt32}, ) resp, err := mware.Do(context.Background(), defaultReq().WithQuery(`{food="bar"}`)) @@ -175,6 +177,7 @@ func Test_ShardingByPass(t *testing.T) { handler, log.NewNopLogger(), nilShardingMetrics, + fakeLimits{maxSeries: math.MaxInt32}, ) _, err := mware.Do(context.Background(), defaultReq().WithQuery(`1+1`)) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index ae2a850f2505d..5cd4e366e2a97 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -239,6 +239,7 @@ func NewLogFilterTripperware( minShardingLookback, instrumentMetrics, // instrumentation is included in the sharding middleware shardingMetrics, + limits, ), ) } @@ -388,6 +389,7 @@ func NewMetricTripperware( minShardingLookback, instrumentMetrics, // instrumentation is included in the sharding middleware shardingMetrics, + limits, ), ) } diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index e05e524dde8e7..d308964cc0fe0 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io/ioutil" + "math" "net/http" "net/http/httptest" "net/url" @@ -92,7 +93,7 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() } @@ -493,6 +494,7 @@ func TestEntriesLimitWithZeroTripperware(t *testing.T) { type fakeLimits struct { maxQueryParallelism int maxEntriesLimitPerQuery int + maxSeries int splits map[string]time.Duration } @@ -518,6 +520,10 @@ func (f fakeLimits) MaxEntriesLimitPerQuery(string) int { return f.maxEntriesLimitPerQuery } +func (f fakeLimits) MaxQuerySeries(string) int { + return f.maxSeries +} + func (f fakeLimits) MaxCacheFreshness(string) time.Duration { return 1 * time.Minute } diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index f6eb9d0723bf4..975ea79d5578d 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -83,6 +83,7 @@ func (h *splitByInterval) Process( parallelism int, threshold int64, input []*lokiResult, + userID string, ) ([]queryrange.Response, error) { var responses []queryrange.Response ctx, cancel := context.WithCancel(ctx) @@ -102,8 +103,10 @@ func (h *splitByInterval) Process( p = len(input) } + // per request wrapped handler for limiting the amount of series. + next := newSeriesLimiter(h.limits.MaxQuerySeries(userID)).Wrap(h.next) for i := 0; i < p; i++ { - go h.loop(ctx, ch) + go h.loop(ctx, ch, next) } for _, x := range input { @@ -134,14 +137,14 @@ func (h *splitByInterval) Process( return responses, nil } -func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) { +func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next queryrange.Handler) { for data := range ch { sp, ctx := opentracing.StartSpanFromContext(ctx, "interval") data.req.LogToSpan(sp) - resp, err := h.next.Do(ctx, data.req) + resp, err := next.Do(ctx, data.req) select { case <-ctx.Done(): @@ -203,7 +206,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra }) } - resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input) + resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input, userid) if err != nil { return nil, err } diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index 1d8487c92eef0..5e2db28cc0d01 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -7,6 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logql" ) @@ -30,7 +31,9 @@ func WriteError(err error, w http.ResponseWriter) { http.Error(w, ErrDeadlineExceeded, http.StatusGatewayTimeout) case errors.As(err, &queryErr): http.Error(w, err.Error(), http.StatusBadRequest) - case logql.IsParseError(err) || logql.IsPipelineError(err): + case errors.Is(err, logql.ErrLimit) || errors.Is(err, logql.ErrParse) || errors.Is(err, logql.ErrPipeline): + http.Error(w, err.Error(), http.StatusBadRequest) + case errors.Is(err, user.ErrNoOrgID): http.Error(w, err.Error(), http.StatusBadRequest) default: if grpcErr, ok := httpgrpc.HTTPResponseFromError(err); ok { diff --git a/pkg/util/server/error_test.go b/pkg/util/server/error_test.go index b557ffa309e53..5ea16c7fa0d41 100644 --- a/pkg/util/server/error_test.go +++ b/pkg/util/server/error_test.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logql" ) @@ -25,6 +26,7 @@ func Test_writeError(t *testing.T) { expectedStatus int }{ {"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest}, + {"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest}, {"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"parse error", logql.ParseError{}, "parse error : ", http.StatusBadRequest}, {"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), "foo", http.StatusBadRequest}, diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 28a4676f3d339..d381aaaaead04 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -39,6 +39,7 @@ type Limits struct { // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_chunks_per_query"` + MaxQuerySeries int `yaml:"max_query_series"` MaxQueryLength time.Duration `yaml:"max_query_length"` MaxQueryParallelism int `yaml:"max_query_parallelism"` CardinalityLimit int `yaml:"cardinality_limit"` @@ -75,6 +76,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.") + f.IntVar(&l.MaxQuerySeries, "querier.max-query-series", 500, "Limit the maximum of unique series returned by a metric query. When the limit is reached an error is returned.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.") f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query") @@ -204,6 +206,11 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration { return o.getOverridesForUser(userID).MaxQueryLength } +// MaxQueryLength returns the limit of the series of metric queries. +func (o *Overrides) MaxQuerySeries(userID string) int { + return o.getOverridesForUser(userID).MaxQuerySeries +} + // MaxQueryParallelism returns the limit to the number of sub-queries the // frontend will process in parallel. func (o *Overrides) MaxQueryParallelism(userID string) int {