Skip to content

Commit

Permalink
fix: enforce query timeouts and limits
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Sep 20, 2024
1 parent 2ef80ec commit a23ea8b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
9 changes: 9 additions & 0 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,15 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ
}

func (q *QuerierAPI) DetectedFieldsHandler(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
expr, err := syntax.ParseLogSelector(req.Query, true)
if err != nil {
return nil, err
}

if err := q.validateMaxEntriesLimits(ctx, expr, req.LineLimit); err != nil {
return nil, err
}

resp, err := q.querier.DetectedFields(ctx, req)
if err != nil {
return nil, err
Expand Down
19 changes: 14 additions & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
listutil "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
util_validation "github.com/grafana/loki/v3/pkg/util/validation"

"github.com/grafana/loki/pkg/push"
Expand Down Expand Up @@ -1073,6 +1074,17 @@ func containsAllIDTypes(values []string) bool {
}

func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
empty := logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{},
FieldLimit: req.GetFieldLimit(),
}

tenants, _ := tenant.TenantIDs(ctx)
timeoutCapture := func(id string) time.Duration { return q.limits.QueryTimeout(ctx, id) }
queryTimeout := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, timeoutCapture)
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()

expr, err := syntax.ParseLogSelector(req.Query, true)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1134,13 +1146,10 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
}
}

//return an empty response instead of an error, so if there are results in other splits, they can be returned
//return an empty response instead of an error, so if there are results in other splits, they can be returned
if len(iters) == 0 {
level.Warn(q.logger).Log("msg", "no detected field results from store or ingester", "error", errs.Err())
return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{},
FieldLimit: req.GetFieldLimit(),
}, nil
return &empty, nil
}

mergedIter := iter.NewMergeEntryIterator(ctx, iters, logproto.BACKWARD)
Expand Down

0 comments on commit a23ea8b

Please sign in to comment.