Skip to content

Commit

Permalink
refactors splitby to not require buffered channels (#1569)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored and slim-bean committed Jan 23, 2020
1 parent 5026dfe commit 1269c92
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ func SplitByIntervalMiddleware(interval time.Duration, limits queryrange.Limits,
}

type lokiResult struct {
req queryrange.Request
resp chan queryrange.Response
err chan error
req queryrange.Request
ch chan *packedResp
}

type packedResp struct {
resp queryrange.Response
err error
}

type splitByInterval struct {
Expand Down Expand Up @@ -79,14 +83,15 @@ func (h *splitByInterval) Process(
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-x.err:
return nil, err
case resp := <-x.resp:
case data := <-x.ch:
if data.err != nil {
return nil, err
}

responses = append(responses, resp)
responses = append(responses, data.resp)

// see if we can exit early if a limit has been reached
threshold -= resp.(*LokiResponse).Count()
threshold -= data.resp.(*LokiResponse).Count()
if threshold <= 0 {
return responses, nil
}
Expand All @@ -105,12 +110,14 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) {
queryrange.LogToSpan(ctx, data.req)

resp, err := h.next.Do(ctx, data.req)
if err != nil {
data.err <- err
} else {
data.resp <- resp

select {
case <-ctx.Done():
sp.Finish()
return
case data.ch <- &packedResp{resp, err}:
sp.Finish()
}
sp.Finish()
}
}

Expand Down Expand Up @@ -138,9 +145,8 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra
input := make([]*lokiResult, 0, len(intervals))
for _, interval := range intervals {
input = append(input, &lokiResult{
req: interval,
resp: make(chan queryrange.Response, 1),
err: make(chan error, 1),
req: interval,
ch: make(chan *packedResp),
})
}

Expand Down

0 comments on commit 1269c92

Please sign in to comment.