Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict the number of results processed per index worker #3269

Merged
merged 26 commits into from
Mar 1, 2021
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9abfd17
Restrict the number of results processed per index worker
ryanhall07 Feb 23, 2021
b4b543e
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 23, 2021
7c11e7d
mock gen
ryanhall07 Feb 23, 2021
c8e2e40
use channels for fixed permits
ryanhall07 Feb 23, 2021
2fa777b
use TryAcquire to limit # of go routines
ryanhall07 Feb 24, 2021
ae102f5
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 24, 2021
4581f11
Just use Acquire to get permits
ryanhall07 Feb 24, 2021
2107c97
review comments
ryanhall07 Feb 25, 2021
a22b186
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
13cb785
fix tests
ryanhall07 Feb 25, 2021
d5b6680
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
2d24354
fix config test
ryanhall07 Feb 25, 2021
ae5ac98
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
ae900ca
Limit block iters by time
ryanhall07 Feb 25, 2021
1f9ebbd
fix tests
ryanhall07 Feb 25, 2021
b9cf04c
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
92c93e3
remove ctx from Release
ryanhall07 Feb 26, 2021
ad52f6f
Merge branch 'rhall-worker-pool-iter' of github.com:m3db/m3 into rhal…
ryanhall07 Feb 26, 2021
a3c33af
permit quota is int64
ryanhall07 Feb 26, 2021
53c5a48
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 26, 2021
dd0678a
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 27, 2021
1a563c3
default max worker time to 1s
ryanhall07 Feb 27, 2021
c08b57a
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 28, 2021
b7ab449
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Mar 1, 2021
5b9b982
Remove unused MaxResultsPerWorkerConfiguration
ryanhall07 Mar 1, 2021
1337a6d
Merge branch 'rhall-worker-pool-iter' of github.com:m3db/m3 into rhal…
ryanhall07 Mar 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 30 additions & 39 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,9 +1630,28 @@ func (i *nsIndex) queryWithSpan(
}
}()

// used by each blockIter to break the iteration loop if the query can't continue processing.
queryCanceled := func() bool {
return opts.LimitsExceeded(results.Size(), results.TotalDocsCount()) || state.hasErr()
// waitForPermit waits for a permit. returns true if the permit was acquired and the wait time.
waitForPermit := func() (bool, time.Duration) {
queryCanceled := func() bool {
return opts.LimitsExceeded(results.Size(), results.TotalDocsCount()) || state.hasErr()
}
ryanhall07 marked this conversation as resolved.
Show resolved Hide resolved
// make sure the query hasn't been canceled before waiting for a permit.
if queryCanceled() {
return false, 0
}
startWait := time.Now()
err := permits.Acquire(ctx)
waitTime := time.Since(startWait)
if err != nil {
state.addErr(err)
return false, waitTime
}
// make sure the query hasn't been canceled while waiting for a permit.
if queryCanceled() {
permits.Release()
return false, waitTime
}
return true, waitTime
}

// We're looping through all the blocks that we need to query and kicking
Expand All @@ -1644,32 +1663,14 @@ func (i *nsIndex) queryWithSpan(
// Capture for async query execution below.
blockIter := blockIter

// make sure the query hasn't been canceled
if queryCanceled() {
// acquire a permit before kicking off the goroutine to process the iterator. this limits the number of
// concurrent goroutines to # of permits + large queries that needed multiple iterations to finish.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

acq, waitTime := waitForPermit()
blockIter.waitTime += waitTime
if !acq {
break
}

// first use TryAcquire to kick off as many parallel block queries as possible, in case no other queries are
// actively running.
acq, err := permits.TryAcquire(ctx)
if err != nil {
state.addErr(err)
break
}
// block waiting for a permit if none are available. this limits the number of concurrent go routines running.
if !acq {
startWait := time.Now()
err := permits.Acquire(ctx)
blockIter.waitTime += time.Since(startWait)
if err != nil {
state.addErr(err)
break
}
// make sure the query hasn't been canceled while waiting for a permit.
if queryCanceled() {
break
}
}
wg.Add(1)
// kick off a go routine to process the entire iterator.
go func() {
ryanhall07 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -1678,19 +1679,9 @@ func (i *nsIndex) queryWithSpan(
// if this is not the first iteration of the iterator, there were more results than the
// MaxResultsPerPermit, so need to acquire another permit.
if !first {
// don't wait for a permit if the query has been canceled.
if queryCanceled() {
break
}
startWait := time.Now()
err := permits.Acquire(ctx)
blockIter.waitTime += time.Since(startWait)
if err != nil {
state.addErr(err)
break
}
// check the query hasn't been canceled while waiting.
if queryCanceled() {
acq, waitTime := waitForPermit()
blockIter.waitTime += waitTime
if !acq {
break
}
}
Expand Down