Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support block planning for SelectSeries API. #2683

Merged
merged 9 commits into from
Nov 15, 2023
123 changes: 83 additions & 40 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ func (queriers Queriers) MergeSpanProfile(ctx context.Context, stream *connect.B
type BlockGetter func(ctx context.Context, start, end model.Time, hints *ingestv1.Hints) (Queriers, error)

func (queriers Queriers) forTimeRange(_ context.Context, start, end model.Time, hints *ingestv1.Hints) (Queriers, error) {

skipBlock := HintsToBlockSkipper(hints)

result := make(Queriers, 0, len(queriers))
Expand Down Expand Up @@ -739,6 +738,7 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile

for i, querier := range queriers {
if skipBlock(querier.BlockID()) {
iters[i] = iter.NewEmptyIterator[Profile]()
continue
}
i := i
Expand Down Expand Up @@ -814,7 +814,6 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
for _, querier := range queriers {
querier := querier
g.Go(util.RecoverPanic(func() error {

iters, err := querier.SelectMatchingProfiles(ctx, request)
if err != nil {
return err
Expand Down Expand Up @@ -1049,54 +1048,98 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
otlog.String("by", strings.Join(by, ",")),
)

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), nil)
queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), request.Hints)
if err != nil {
return err
}
result := make([][]*typesv1.Series, 0, len(queriers))
g, ctx := errgroup.WithContext(ctx)
sync := lo.Synchronize()

iters, err := SelectMatchingProfiles(ctx, request, queriers)
if err != nil {
return err
}
// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesLabelsResponse, *ingestv1.MergeProfilesLabelsRequest],
*ingestv1.MergeProfilesLabelsResponse,
*ingestv1.MergeProfilesLabelsRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
deduplicationNeeded := true
if request.Hints != nil && request.Hints.Block != nil {
deduplicationNeeded = request.Hints.Block.Deduplication
}

// Signals the end of the profile streaming by sending an empty request.
// This allows the client to not block other streaming ingesters.
if err := stream.Send(&ingestv1.MergeProfilesLabelsResponse{}); err != nil {
return err
}
if !deduplicationNeeded {
// signal the end of the profile streaming by sending an empty response.
sp.LogFields(otlog.String("msg", "no profile streaming as no deduplication needed"))
if err = stream.Send(&ingestv1.MergeProfilesLabelsResponse{}); err != nil {
return err
}
// in this path we can just merge the profiles from each block and send the result to the client.
for _, querier := range queriers {
querier := querier
g.Go(util.RecoverPanic(func() error {
iters, err := querier.SelectMatchingProfiles(ctx, request)
if err != nil {
return err
}
defer func() {
iters.Close()
}()

result := make([][]*typesv1.Series, 0, len(queriers))
g, ctx := errgroup.WithContext(ctx)
sync := lo.Synchronize()
for i, querier := range queriers {
i := i
querier := querier
if len(selectedProfiles[i]) == 0 {
continue
profiles, err := iter.Slice(iters)
if err != nil {
return err
}

if len(profiles) == 0 {
return nil
}

merge, err := querier.MergeByLabels(ctx, iter.NewSliceIterator(querier.Sort(profiles)), by...)
if err != nil {
return err
}

sync.Do(func() {
result = append(result, merge)
})
return nil
}))
}
// Sort profiles for better read locality.
// And merge async the result for each queriers.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergeByLabels(ctx,
iter.NewSliceIterator(querier.Sort(selectedProfiles[i])),
by...)
if err != nil {
return err
} else {
iters, err := SelectMatchingProfiles(ctx, request, queriers)
if err != nil {
return err
}
// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesLabelsResponse, *ingestv1.MergeProfilesLabelsRequest],
*ingestv1.MergeProfilesLabelsResponse,
*ingestv1.MergeProfilesLabelsRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
}

// Signals the end of the profile streaming by sending an empty request.
// This allows the client to not block other streaming ingesters.
if err := stream.Send(&ingestv1.MergeProfilesLabelsResponse{}); err != nil {
return err
}
for i, querier := range queriers {
i := i
querier := querier
if len(selectedProfiles[i]) == 0 {
continue
}
sync.Do(func() {
result = append(result, merge)
})
// Sort profiles for better read locality.
// And merge async the result for each queriers.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergeByLabels(ctx,
iter.NewSliceIterator(querier.Sort(selectedProfiles[i])),
by...)
if err != nil {
return err
}
sync.Do(func() {
result = append(result, merge)
})

return nil
}))
return nil
}))
}
}

if err := g.Wait(); err != nil {
Expand Down
25 changes: 20 additions & 5 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,36 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
return selectMergeTree(gCtx, responses)
}

func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest, plan map[string]*ingestv1.BlockHints) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters")
defer sp.Finish()
responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) {
return ic.MergeProfilesLabels(ctx), nil
})
var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels]
var err error

if plan != nil {
responses, err = forAllPlannedIngesters(ctx, q.ingesterQuerier, plan, func(ctx context.Context, q IngesterQueryClient, hint *ingestv1.Hints) (clientpool.BidiClientMergeProfilesLabels, error) {
return q.MergeProfilesLabels(ctx), nil
})
} else {
responses, err = forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) {
return ic.MergeProfilesLabels(ctx), nil
})
}
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
// send the first initial request to all ingesters.
g, _ := errgroup.WithContext(ctx)
for _, r := range responses {
r := r
hints, ok := plan[r.addr]
if !ok && plan != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("no hints found for replica %s", r.addr))
}
g.Go(util.RecoverPanic(func() error {
return r.response.Send(req.CloneVT())
req := req.CloneVT()
req.Request.Hints = &ingestv1.Hints{Block: hints}
return r.response.Send(req)
}))
}
if err := g.Wait(); err != nil {
Expand Down
25 changes: 18 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func (q *Querier) blockSelect(ctx context.Context, start, end model.Time) (block
}

return results.blockPlan(ctx), nil

}

func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) {
Expand Down Expand Up @@ -820,7 +819,19 @@ func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querier
ctx, cancel := context.WithCancel(ctx)
defer cancel()

responses, err := q.selectSeries(ctx, req)
// determine the block hints
plan, err := q.blockSelect(ctx, model.Time(req.Msg.Start), model.Time(req.Msg.End))
if isEndpointNotExistingErr(err) {
level.Warn(spanlogger.FromContext(ctx, q.logger)).Log(
"msg", "block select not supported on at least one component, fallback to use full dataset",
"err", err,
)
plan = nil
} else if err != nil {
return nil, fmt.Errorf("error during block select: %w", err)
}

responses, err := q.selectSeries(ctx, req, plan)
if err != nil {
return nil, err
}
Expand All @@ -840,7 +851,7 @@ func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querier
}), nil
}

func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest], plan map[string]*ingestv1.BlockHints) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
stepMs := time.Duration(req.Msg.Step * float64(time.Second)).Milliseconds()
sort.Strings(req.Msg.GroupBy)

Expand All @@ -862,10 +873,10 @@ func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querier
Type: profileType,
},
By: req.Msg.GroupBy,
})
}, plan)
}

storeQueries := splitQueryToStores(model.Time(start), model.Time(req.Msg.End), model.Now(), q.cfg.QueryStoreAfter, nil)
storeQueries := splitQueryToStores(model.Time(start), model.Time(req.Msg.End), model.Now(), q.cfg.QueryStoreAfter, plan)

var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels]

Expand All @@ -875,15 +886,15 @@ func (q *Querier) selectSeries(ctx context.Context, req *connect.Request[querier

// todo in parallel
if storeQueries.ingester.shouldQuery {
ir, err := q.selectSeriesFromIngesters(ctx, storeQueries.ingester.MergeSeriesRequest(req.Msg, profileType))
ir, err := q.selectSeriesFromIngesters(ctx, storeQueries.ingester.MergeSeriesRequest(req.Msg, profileType), plan)
if err != nil {
return nil, err
}
responses = append(responses, ir...)
}

if storeQueries.storeGateway.shouldQuery {
ir, err := q.selectSeriesFromStoreGateway(ctx, storeQueries.storeGateway.MergeSeriesRequest(req.Msg, profileType))
ir, err := q.selectSeriesFromStoreGateway(ctx, storeQueries.storeGateway.MergeSeriesRequest(req.Msg, profileType), plan)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading