Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: Execute Selects concurrently per query #2657

Merged
merged 5 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags.
- [#2657](https://github.com/thanos-io/thanos/pull/2657) Querier: Now, has the ability to perform concurrent select request per query.

## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS

Expand Down Expand Up @@ -819,4 +820,4 @@ Initial version to have a stable reference before [gossip protocol removal](/doc
- Compact / Downsample offline commands.
- Bucket commands.
- Downsampling support for UI.
- Grafana dashboards for Thanos components.
- Grafana dashboards for Thanos components.
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default 4 feels like a bit arbitrary? maybe default to 1? as current default of maxConcurentQueries is 20, this might significantly increase respurce usage?

Maybe we can read cpu limits etc and autoconfigure those values based on available cpu / or cpu assigned to cgroups?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now 1 and experimenting on our prods to figure out better default is the way to go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's an arbitrary number. I've actually left a comment on the PR, probably got removed between force pushes :D That's actually a point I want to discuss.

Autoconfigure sounds good, we can try that.

However, I also suspect this will be I/O bounded and having 20x4 at worst case shouldn't hurt that much. That being said, maybe we can try to craft a benchmark to find the magic number.

Default("4").Int()

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()

Expand Down Expand Up @@ -159,6 +162,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*webExternalPrefix,
*webPrefixHeaderName,
*maxConcurrentQueries,
*maxConcurrentSelects,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
Expand Down Expand Up @@ -202,6 +206,7 @@ func runQuery(
webExternalPrefix string,
webPrefixHeaderName string,
maxConcurrentQueries int,
maxConcurrentSelects int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
queryReplicaLabels []string,
Expand Down Expand Up @@ -280,7 +285,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, proxy)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects, queryTimeout)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down
9 changes: 9 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ type queryData struct {
Additional field is `Warnings` that contains every error that occurred that is assumed non critical. `partial_response`
option controls if storeAPI unavailability is considered critical.

### Concurrent Selects

Thanos Querier has the ability to perform concurrent select request per query. It dissects given PromQL statement and executes selectors concurrently against the discovered StoreAPIs.
The maximum number of concurrent requests are being made per query is controller by `query.max-concurrent-select` flag.
Keep in mind that the maximum number of concurrent queries that are handled by querier is controlled by `query.max-concurrent`. Please consider implications of combined value while tuning the querier.

## Expose UI on a sub-path

It is possible to expose thanos-query UI and optionally API on a sub-path.
Expand Down Expand Up @@ -333,6 +339,9 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
--query.replica-label=QUERY.REPLICA-LABEL ...
Labels to treat as a replica indicator along
which data is deduplicated. Still you will be
Expand Down
5 changes: 3 additions & 2 deletions pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ func TestEndpoints(t *testing.T) {
testutil.Ok(t, app.Commit())

now := time.Now()
timeout := 100 * time.Second
api := &API{
queryableCreate: query.NewQueryableCreator(nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
Timeout: timeout,
}),
now: func() time.Time { return now },
gate: gate.NewKeeper(nil).NewGate(4),
Expand Down
37 changes: 37 additions & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,40 @@ func (it *dedupSeriesIterator) Err() error {
}
return it.b.Err()
}

type lazySeriesSet struct {
create func() (s storage.SeriesSet, ok bool)

set storage.SeriesSet
}

func (c *lazySeriesSet) Next() bool {
if c.set != nil {
return c.set.Next()
}

var ok bool
c.set, ok = c.create()
return ok
}

func (c *lazySeriesSet) Err() error {
if c.set != nil {
return c.set.Err()
}
return nil
}

func (c *lazySeriesSet) At() storage.Series {
if c.set != nil {
return c.set.At()
}
return nil
}

func (c *lazySeriesSet) Warnings() storage.Warnings {
if c.set != nil {
return c.set.Warnings()
}
return nil
}
120 changes: 93 additions & 27 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"context"
"sort"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand All @@ -27,38 +30,49 @@ import (
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator {
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
keeper := gate.NewKeeper(reg)

return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
logger: logger,
reg: reg,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
gateKeeper: keeper,
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
}
}
}

type queryable struct {
logger log.Logger
replicaLabels []string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
logger log.Logger
reg prometheus.Registerer
replicaLabels []string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
gateKeeper *gate.Keeper
maxConcurrentSelects int
selectTimeout time.Duration
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks), nil
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil
}

type querier struct {
ctx context.Context
logger log.Logger
reg prometheus.Registerer
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
Expand All @@ -67,20 +81,24 @@ type querier struct {
maxResolutionMillis int64
partialResponse bool
skipChunks bool
selectGate gate.Gate
selectTimeout time.Duration
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
// store API endpoints.
func newQuerier(
ctx context.Context,
logger log.Logger,
reg prometheus.Registerer,
mint, maxt int64,
replicaLabels []string,
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
skipChunks bool,
partialResponse, skipChunks bool,
selectGate gate.Gate,
selectTimeout time.Duration,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -92,9 +110,13 @@ func newQuerier(
rl[replicaLabel] = struct{}{}
}
return &querier{
ctx: ctx,
logger: logger,
cancel: cancel,
ctx: ctx,
logger: logger,
reg: reg,
cancel: cancel,
selectGate: selectGate,
selectTimeout: selectTimeout,

mint: mint,
maxt: maxt,
replicaLabels: rl,
Expand Down Expand Up @@ -173,16 +195,60 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
for i, m := range ms {
matchers[i] = m.String()
}
span, ctx := tracing.StartSpan(q.ctx, "querier_select", opentracing.Tags{

// The querier has a context but it gets cancelled, as soon as query evaluation is completed, by the engine.
// We want to prevent this from happening for the async storea API calls we make while preserving tracing context.
ctx := tracing.CopyTraceContext(context.Background(), q.ctx)
ctx, cancel := context.WithTimeout(ctx, q.selectTimeout)
span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{
"minTime": hints.Start,
"maxTime": hints.End,
"matchers": "{" + strings.Join(matchers, ",") + "}",
})
defer span.Finish()

promise := make(chan storage.SeriesSet, 1)
go func() {
defer close(promise)

var err error
tracing.DoInSpan(ctx, "querier_select_gate_ismyturn", func(ctx context.Context) {
err = q.selectGate.Start(ctx)
})
if err != nil {
promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn"))
return
}
defer q.selectGate.Done()

span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn")
defer span.Finish()

set, err := q.selectFn(ctx, hints, ms...)
if err != nil {
promise <- storage.ErrSeriesSet(err)
return
}

promise <- set
}()

return &lazySeriesSet{create: func() (storage.SeriesSet, bool) {
defer cancel()
defer span.Finish()

// Only gets called once, for the first Next() call of the series set.
set, ok := <-promise
if !ok {
return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false
}
return set, set.Next()
}}
}

func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
sms, err := storepb.TranslatePromMatchers(ms...)
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "convert matchers"))
return nil, errors.Wrap(err, "convert matchers")
}

aggrs := aggrsFromFunc(hints.Func)
Expand All @@ -197,7 +263,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "proxy Series()"))
return nil, errors.Wrap(err, "proxy Series()")
}

var warns storage.Warnings
Expand All @@ -213,7 +279,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
warns: warns,
}
}, nil
}

// TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible.
Expand All @@ -228,7 +294,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match

// The merged series set assembles all potentially-overlapping time ranges of the same series into a single one.
// TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available.
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER)
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil
}

// sortDedupLabels re-sorts the set so that the same series with different replica
Expand Down
Loading