Skip to content

Commit

Permalink
Restrict the time a query can hold an index worker (#3269)
Browse files Browse the repository at this point in the history
A new MaxWorkerTime option is introduced to cap how long a query can hold an 
index worker at once. If the max is exceeded, the
query must yield the index worker back and acquire it again
(potentially waiting) to continue processing the results.

This cap ensures large queries don't dominate the finite number of index
workers allowed to run concurrently and lock out smaller queries. The
idea is users would want to set the max large enough so the vast
majority of typical queries can finish with only a single worker
acquisition.
  • Loading branch information
ryanhall07 authored Mar 1, 2021
1 parent 73bac90 commit 973caaf
Show file tree
Hide file tree
Showing 31 changed files with 1,379 additions and 814 deletions.
7 changes: 7 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ type IndexConfiguration struct {
// as they are very CPU-intensive (regex and FST matching).
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"`

// MaxWorkerTime is the maximum time a query can hold an index worker at once. If a query does not finish in this
// time it yields the worker and must wait again for another worker to resume. The number of workers available to
// all queries is defined by MaxQueryIDsConcurrency.
// Capping the maximum time per worker ensures a few large queries don't hold all the concurrent workers and lock
// out many small queries from running.
MaxWorkerTime time.Duration `yaml:"maxWorkerTime"`

// RegexpDFALimit is the limit on the max number of states used by a
// regexp deterministic finite automaton. Default is 10,000 states.
RegexpDFALimit *int `yaml:"regexpDFALimit"`
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func TestConfiguration(t *testing.T) {
expected := `db:
index:
maxQueryIDsConcurrency: 0
maxWorkerTime: 0s
regexpDFALimit: null
regexpFSALimit: null
forwardIndexProbability: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *fakePermits) TryAcquire(_ context.Context) (bool, error) {
return true, nil
}

func (p *fakePermits) Release() {
func (p *fakePermits) Release(_ int64) {
p.released++
p.available++
}
5 changes: 3 additions & 2 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,9 +1092,10 @@ func (i *fetchTaggedResultsIter) Close(err error) {

i.seriesBlocks.RecordValue(float64(i.totalSeriesBlocks))

for n := 0; n < i.batchesAcquired; n++ {
i.blockPermits.Release()
for n := 0; n < i.batchesAcquired-1; n++ {
i.blockPermits.Release(int64(i.blocksPerBatch))
}
i.blockPermits.Release(int64(i.blocksPerBatch - i.blocksAvailable))
}

// IDResult is the FetchTagged result for a series ID.
Expand Down
26 changes: 14 additions & 12 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ import (
xos "github.com/m3db/m3/src/x/os"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"

apachethrift "github.com/apache/thrift/lib/go/thrift"
"github.com/m3dbx/vellum/levenshtein"
"github.com/m3dbx/vellum/levenshtein2"
"github.com/m3dbx/vellum/regexp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -371,14 +370,6 @@ func Run(runOpts RunOptions) {

opentracing.SetGlobalTracer(tracer)

if cfg.Index.MaxQueryIDsConcurrency != 0 {
queryIDsWorkerPool := xsync.NewWorkerPool(cfg.Index.MaxQueryIDsConcurrency)
queryIDsWorkerPool.Init()
opts = opts.SetQueryIDsWorkerPool(queryIDsWorkerPool)
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}

// Set global index options.
if n := cfg.Index.RegexpDFALimitOrDefault(); n > 0 {
regexp.SetStateLimit(n)
Expand Down Expand Up @@ -481,8 +472,14 @@ func Run(runOpts RunOptions) {
seriesReadPermits.Start()
defer seriesReadPermits.Stop()

opts = opts.SetPermitsOptions(opts.PermitsOptions().
SetSeriesReadPermitsManager(seriesReadPermits))
permitOptions := opts.PermitsOptions().SetSeriesReadPermitsManager(seriesReadPermits)
if cfg.Index.MaxQueryIDsConcurrency != 0 {
permitOptions = permitOptions.SetIndexQueryPermitsManager(
permits.NewFixedPermitsManager(cfg.Index.MaxQueryIDsConcurrency))
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}
opts = opts.SetPermitsOptions(permitOptions)

// Setup postings list cache.
var (
Expand Down Expand Up @@ -524,6 +521,11 @@ func Run(runOpts RunOptions) {
}).
SetMmapReporter(mmapReporter).
SetQueryLimits(queryLimits)

if cfg.Index.MaxWorkerTime > 0 {
indexOpts = indexOpts.SetMaxWorkerTime(cfg.Index.MaxWorkerTime)
}

opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
Expand Down
Loading

0 comments on commit 973caaf

Please sign in to comment.