Skip to content

Commit

Permalink
Support block planning for SelectSeries API. (#2683)
Browse files Browse the repository at this point in the history
* Support block planning for SelectSeries API.

* Fixes tests

* Avoids panics when no blocks

* Fixes gap in metrics

* Update pkg/querier/ingester_querier.go

Co-authored-by: Anton Kolesnikov <[email protected]>

* Update pkg/querier/store_gateway_querier.go

Co-authored-by: Anton Kolesnikov <[email protected]>

* Fixes messup with iterators

* Fixes querier downstream queries

---------

Co-authored-by: Anton Kolesnikov <[email protected]>
  • Loading branch information
cyriltovena and kolesnikovae committed Nov 15, 2023
1 parent fcd112e commit e434f50
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 153 deletions.
123 changes: 83 additions & 40 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ func (queriers Queriers) MergeSpanProfile(ctx context.Context, stream *connect.B
type BlockGetter func(ctx context.Context, start, end model.Time, hints *ingestv1.Hints) (Queriers, error)

func (queriers Queriers) forTimeRange(_ context.Context, start, end model.Time, hints *ingestv1.Hints) (Queriers, error) {

skipBlock := HintsToBlockSkipper(hints)

result := make(Queriers, 0, len(queriers))
Expand Down Expand Up @@ -739,6 +738,7 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile

for i, querier := range queriers {
if skipBlock(querier.BlockID()) {
iters[i] = iter.NewEmptyIterator[Profile]()
continue
}
i := i
Expand Down Expand Up @@ -814,7 +814,6 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
for _, querier := range queriers {
querier := querier
g.Go(util.RecoverPanic(func() error {

iters, err := querier.SelectMatchingProfiles(ctx, request)
if err != nil {
return err
Expand Down Expand Up @@ -1049,54 +1048,98 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
otlog.String("by", strings.Join(by, ",")),
)

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), nil)
queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), request.Hints)
if err != nil {
return err
}
result := make([][]*typesv1.Series, 0, len(queriers))
g, ctx := errgroup.WithContext(ctx)
sync := lo.Synchronize()

iters, err := SelectMatchingProfiles(ctx, request, queriers)
if err != nil {
return err
}
// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesLabelsResponse, *ingestv1.MergeProfilesLabelsRequest],
*ingestv1.MergeProfilesLabelsResponse,
*ingestv1.MergeProfilesLabelsRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
deduplicationNeeded := true
if request.Hints != nil && request.Hints.Block != nil {
deduplicationNeeded = request.Hints.Block.Deduplication
}

// Signals the end of the profile streaming by sending an empty request.
// This allows the client to not block other streaming ingesters.
if err := stream.Send(&ingestv1.MergeProfilesLabelsResponse{}); err != nil {
return err
}
if !deduplicationNeeded {
// signal the end of the profile streaming by sending an empty response.
sp.LogFields(otlog.String("msg", "no profile streaming as no deduplication needed"))
if err = stream.Send(&ingestv1.MergeProfilesLabelsResponse{}); err != nil {
return err
}
// in this path we can just merge the profiles from each block and send the result to the client.
for _, querier := range queriers {
querier := querier
g.Go(util.RecoverPanic(func() error {
iters, err := querier.SelectMatchingProfiles(ctx, request)
if err != nil {
return err
}
defer func() {
iters.Close()
}()

result := make([][]*typesv1.Series, 0, len(queriers))
g, ctx := errgroup.WithContext(ctx)
sync := lo.Synchronize()
for i, querier := range queriers {
i := i
querier := querier
if len(selectedProfiles[i]) == 0 {
continue
profiles, err := iter.Slice(iters)
if err != nil {
return err
}

if len(profiles) == 0 {
return nil
}

merge, err := querier.MergeByLabels(ctx, iter.NewSliceIterator(querier.Sort(profiles)), by...)
if err != nil {
return err
}

sync.Do(func() {
result = append(result, merge)
})
return nil
}))
}
// Sort profiles for better read locality.
// And merge async the result for each queriers.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergeByLabels(ctx,
iter.NewSliceIterator(querier.Sort(selectedProfiles[i])),
by...)
if err != nil {
return err
} else {
iters, err := SelectMatchingProfiles(ctx, request, queriers)
if err != nil {
return err
}
// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesLabelsResponse, *ingestv1.MergeProfilesLabelsRequest],
*ingestv1.MergeProfilesLabelsResponse,
*ingestv1.MergeProfilesLabelsRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
}

// Signals the end of the profile streaming by sending an empty request.
// This allows the client to not block other streaming ingesters.
if err := stream.Send(&ingestv1.MergeProfilesLabelsResponse{}); err != nil {
return err
}
for i, querier := range queriers {
i := i
querier := querier
if len(selectedProfiles[i]) == 0 {
continue
}
sync.Do(func() {
result = append(result, merge)
})
// Sort profiles for better read locality.
// And merge async the result for each queriers.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergeByLabels(ctx,
iter.NewSliceIterator(querier.Sort(selectedProfiles[i])),
by...)
if err != nil {
return err
}
sync.Do(func() {
result = append(result, merge)
})

return nil
}))
return nil
}))
}
}

if err := g.Wait(); err != nil {
Expand Down
25 changes: 20 additions & 5 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,36 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
return selectMergeTree(gCtx, responses)
}

func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest, plan map[string]*ingestv1.BlockHints) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters")
defer sp.Finish()
responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) {
return ic.MergeProfilesLabels(ctx), nil
})
var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels]
var err error

if plan != nil {
responses, err = forAllPlannedIngesters(ctx, q.ingesterQuerier, plan, func(ctx context.Context, q IngesterQueryClient, hint *ingestv1.Hints) (clientpool.BidiClientMergeProfilesLabels, error) {
return q.MergeProfilesLabels(ctx), nil
})
} else {
responses, err = forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) {
return ic.MergeProfilesLabels(ctx), nil
})
}
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
// send the first initial request to all ingesters.
g, _ := errgroup.WithContext(ctx)
for _, r := range responses {
r := r
hints, ok := plan[r.addr]
if !ok && plan != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("no hints found for replica %s", r.addr))
}
g.Go(util.RecoverPanic(func() error {
return r.response.Send(req.CloneVT())
req := req.CloneVT()
req.Request.Hints = &ingestv1.Hints{Block: hints}
return r.response.Send(req)
}))
}
if err := g.Wait(); err != nil {
Expand Down
25 changes: 18 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func (q *Querier) blockSelect(ctx context.Context, start, end model.Time) (block
}

return results.blockPlan(ctx), nil

}

func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) {
Expand Down Expand Up @@ -820,7 +819,19 @@ func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querier
ctx, cancel := context.WithCancel(ctx)
defer cancel()

responses, err := q.selectSeries(ctx, req)
// determine the block hints
plan, err := q.blockSelect(ctx, model.Time(req.Msg.Start), model.Time(req.Msg.End))
if isEndpointNotExistingErr(err) {
level.Warn(spanlogger.FromContext(ctx, q.logger)).Log(
"msg", "block select not supported on at least one component, fallback to use full dataset",
"err", err,
)
plan = nil
} else if err != nil {
return nil, fmt.Errorf("error during block select: %w", err)
}

responses, err := q.selectSeries(ctx, req, plan)
if err != nil {
return nil, err
}
Expand All @@ -840,7 +851,7 @@ func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querier
}), nil
}

func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest], plan map[string]*ingestv1.BlockHints) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
stepMs := time.Duration(req.Msg.Step * float64(time.Second)).Milliseconds()
sort.Strings(req.Msg.GroupBy)

Expand All @@ -862,10 +873,10 @@ func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querier
Type: profileType,
},
By: req.Msg.GroupBy,
})
}, plan)
}

storeQueries := splitQueryToStores(model.Time(start), model.Time(req.Msg.End), model.Now(), q.cfg.QueryStoreAfter, nil)
storeQueries := splitQueryToStores(model.Time(start), model.Time(req.Msg.End), model.Now(), q.cfg.QueryStoreAfter, plan)

var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels]

Expand All @@ -875,15 +886,15 @@ func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querier

// todo in parallel
if storeQueries.ingester.shouldQuery {
ir, err := q.selectSeriesFromIngesters(ctx, storeQueries.ingester.MergeSeriesRequest(req.Msg, profileType))
ir, err := q.selectSeriesFromIngesters(ctx, storeQueries.ingester.MergeSeriesRequest(req.Msg, profileType), plan)
if err != nil {
return nil, err
}
responses = append(responses, ir...)
}

if storeQueries.storeGateway.shouldQuery {
ir, err := q.selectSeriesFromStoreGateway(ctx, storeQueries.storeGateway.MergeSeriesRequest(req.Msg, profileType))
ir, err := q.selectSeriesFromStoreGateway(ctx, storeQueries.storeGateway.MergeSeriesRequest(req.Msg, profileType), plan)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e434f50

Please sign in to comment.