From a7795e57b00c6e266e49bc75d0fd0095c3a2f59b Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 10 Jan 2022 09:15:28 +0100 Subject: [PATCH] Fixes a possible cancellation issue (#5075) Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/limits.go | 36 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index f97c391191bd4..8a00780a98bee 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -257,16 +257,16 @@ func newWork(ctx context.Context, req queryrange.Request) work { } func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - var wg sync.WaitGroup - intermediate := make(chan work) + var ( + wg sync.WaitGroup + intermediate = make(chan work) + ctx, cancel = context.WithCancel(r.Context()) + ) defer func() { + cancel() wg.Wait() - close(intermediate) }() - ctx, cancel := context.WithCancel(r.Context()) - defer cancel() - // Do not forward any request header. request, err := rt.codec.DecodeRequest(ctx, r, nil) if err != nil { @@ -284,29 +284,29 @@ func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) parallelism := rt.limits.MaxQueryParallelism(userid) for i := 0; i < parallelism; i++ { + wg.Add(1) go func() { - for w := range intermediate { - resp, err := rt.do(w.ctx, w.req) + defer wg.Done() + for { select { - case w.result <- result{response: resp, err: err}: - case <-w.ctx.Done(): - w.result <- result{err: w.ctx.Err()} + case w := <-intermediate: + resp, err := rt.do(w.ctx, w.req) + w.result <- result{response: resp, err: err} + case <-ctx.Done(): + return } - } }() } response, err := rt.middleware.Wrap( queryrange.HandlerFunc(func(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { - wg.Add(1) - defer wg.Done() - - if ctx.Err() != nil { + w := newWork(ctx, r) + select { + case intermediate <- w: + case <-ctx.Done(): return nil, ctx.Err() } - w := newWork(ctx, r) - intermediate <- w select { case response := <-w.result: return response.response, response.err