Skip to content

Commit

Permalink
Logql/parallel binop (grafana#5317)
Browse files Browse the repository at this point in the history
* adds justification for keeping Downstreamer parallelism

* loads binop legs in parallel

* increases downstreamer default concurrency

* astmapper spanlogger

* always clone expr during mapping to prevent mutability bugs

* Revert "astmapper spanlogger"

This reverts commit 23f6b55.

* cleanup + use errgroup
  • Loading branch information
owen-d authored and KMiller-Grafana committed Feb 4, 2022
1 parent 5da69a1 commit d55efed
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 12 deletions.
4 changes: 4 additions & 0 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type Expr interface {
fmt.Stringer
}

func Clone(e Expr) (Expr, error) {
return ParseExpr(e.String())
}

type QueryParams interface {
LogSelector() (LogSelectorExpr, error)
GetStart() time.Time
Expand Down
36 changes: 26 additions & 10 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -547,16 +548,31 @@ func binOpStepEvaluator(
)
}

// we have two non literal legs
lse, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
if err != nil {
return nil, err
}
rse, err := ev.StepEvaluator(ctx, ev, expr.RHS, q)
if err != nil {
var lse, rse StepEvaluator
g, ctx := errgroup.WithContext(ctx)

// We have two non literal legs,
// load them in parallel
g.Go(func() error {
var err error
lse, err = ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
return err
})
g.Go(func() error {
var err error
rse, err = ev.StepEvaluator(ctx, ev, expr.RHS, q)
return err
})

// ensure both sides are loaded before returning the combined evaluator
if err := g.Wait(); err != nil {
return nil, err
}

// keep a scoped reference to err as it's referenced in the Error()
// implementation of this StepEvaluator
var scopedErr error

return newStepEvaluator(func() (bool, int64, promql.Vector) {
var (
ts int64
Expand Down Expand Up @@ -593,7 +609,7 @@ func binOpStepEvaluator(
case OpTypeUnless:
results = vectorUnless(lhs, rhs, lsigs, rsigs)
default:
results, err = vectorBinop(expr.Op, expr.Opts, lhs, rhs, lsigs, rsigs)
results, scopedErr = vectorBinop(expr.Op, expr.Opts, lhs, rhs, lsigs, rsigs)
}
return true, ts, results
}, func() (lastError error) {
Expand All @@ -605,8 +621,8 @@ func binOpStepEvaluator(
return lastError
}, func() error {
var errs []error
if err != nil {
errs = append(errs, err)
if scopedErr != nil {
errs = append(errs, scopedErr)
}
for _, ev := range []StepEvaluator{lse, rse} {
if err := ev.Error(); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (m ShardMapper) Parse(query string) (noop bool, expr Expr, err error) {
return false, nil, err
}

mappedStr := mapped.String()
originalStr := parsed.String()
mappedStr := mapped.String()
noop = originalStr == mappedStr
if noop {
m.metrics.parsed.WithLabelValues(NoopKey).Inc()
Expand All @@ -126,6 +126,12 @@ func (m ShardMapper) Parse(query string) (noop bool, expr Expr, err error) {
}

func (m ShardMapper) Map(expr Expr, r *shardRecorder) (Expr, error) {
// immediately clone the passed expr to avoid mutating the original
expr, err := Clone(expr)
if err != nil {
return nil, err
}

switch e := expr.(type) {
case *LiteralExpr:
return e, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
DefaultDownstreamConcurrency = 32
DefaultDownstreamConcurrency = 128
)

type DownstreamHandler struct {
Expand Down Expand Up @@ -48,6 +48,12 @@ func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebas
}
}

// Note: After the introduction of the LimitedRoundTripper,
// bounding concurrency in the downstreamer is mostly redundant
// The reason we don't remove it is to prevent malicious queries
// from creating an unreasonably large number of goroutines, such as
// the case of a query like `a / a / a / a / a ..etc`, which could try
// to shard each leg, quickly dispatching an unreasonable number of goroutines.
func (h DownstreamHandler) Downstreamer() logql.Downstreamer {
p := DefaultDownstreamConcurrency
locks := make(chan struct{}, p)
Expand Down

0 comments on commit d55efed

Please sign in to comment.