-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restrict the number of results processed per index worker #3269
Conversation
a05f145
to
706d4c6
Compare
A new MaxResultsPerPermit option is introduced to cap how many index results an index worker can process at a time. If the max is exceeded, the index worker must yield the permit back and acquire it again (potentially waiting) to continue processing the results. This cap ensures large queries don't dominate the finite number of index workers allowed to run concurrently and lock out smaller queries. The idea is users would want to set the max large enough so the vast majority of typical queries can finish with only a single permit acquisition.
706d4c6
to
9abfd17
Compare
Codecov Report
@@ Coverage Diff @@
## master #3269 +/- ##
=========================================
- Coverage 72.5% 72.4% -0.1%
=========================================
Files 1099 1101 +2
Lines 101504 101562 +58
=========================================
- Hits 73616 73607 -9
- Misses 22830 22866 +36
- Partials 5058 5089 +31
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report at Codecov.
|
} | ||
permits.Release() | ||
} | ||
blockIter.searchTime += blockIter.iter.SearchDuration() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a nice plus of this change is we no longer have to acquire locks to update the timing info on the shared results. each goroutine has its own timing information and the sub results are added together when all queries are done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/dbnode/storage/index/options.go
Outdated
@@ -64,6 +64,9 @@ const ( | |||
aggregateResultsEntryArrayPoolSize = 256 | |||
aggregateResultsEntryArrayPoolCapacity = 256 | |||
aggregateResultsEntryArrayPoolMaxCapacity = 256 // Do not allow grows, since we know the size | |||
|
|||
// defaultResultsPerPermit sets the default index results that can be processed per permit acquired. | |||
defaultResultsPerPermit = 10000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what a good default is. I'm hesitant to make the default really large or off, since we changed the access pattern for acquiring workers. now all blocks eagerly attempt to acquire workers in parallel, so without some kind of max, larger queries will dominate even more.
select { | ||
case f.permits <- struct{}{}: | ||
default: | ||
panic("more permits released than acquired") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how we feel about panics in the code base. fwiw the std go semaphore panics for the same reason. this can only happen due to a logical bug, which should be caught with a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah seems sane. Although yes usually try to avoid panics. Alternatively we could make it return an error and queries start failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome, LGTM!
QueryOptions{SeriesLimit: 3}, | ||
results, | ||
10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how are we deciding on the limit here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty arbitrary. high enough for the tests to pass with a single iteration.
@@ -775,15 +814,6 @@ func TestLimits(t *testing.T) { | |||
requireExhaustive: false, | |||
expectedErr: "", | |||
}, | |||
{ | |||
name: "no limits", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this no longer relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't make sense that exhaustive
would be false if there is no limit. just seems made up for the test.
// make sure the query hasn't been canceled | ||
if queryCanceled() { | ||
// acquire a permit before kicking off the goroutine to process the iterator. this limits the number of | ||
// concurrent goroutines to # of permits + large queries that needed multiple iterations to finish. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
@@ -387,6 +387,14 @@ type IndexConfiguration struct { | |||
// as they are very CPU-intensive (regex and FST matching). | |||
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"` | |||
|
|||
// MaxResultsPerPermit is the maximum index results a query can process after obtaining a permit. If a query needs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we split this up into different values for 1) regular queries 2) aggregate queries?
Aggregate queries when scoped with match[]=...
are relatively more expensive per iteration since they have to do postings list interception with each aggregate term which is progressed to, as per:
m3/src/dbnode/storage/index/fields_terms_iterator.go
Lines 229 to 248 in 812d585
fti.current.term, fti.current.postings = fti.termIter.Current() | |
if fti.restrictByPostings == nil { | |
// No restrictions. | |
return true, nil | |
} | |
bitmap, ok := roaring.BitmapFromPostingsList(fti.current.postings) | |
if !ok { | |
return false, errUnpackBitmapFromPostingsList | |
} | |
// Check term is part of at least some of the documents we're | |
// restricted to providing results for based on intersection | |
// count. | |
// Note: IntersectionCount is significantly faster than intersecting and | |
// counting results and also does not allocate. | |
if n := fti.restrictByPostings.IntersectionCount(bitmap); n > 0 { | |
// Matches, this is next result. | |
return true, nil | |
} |
and
m3/src/dbnode/storage/index/fields_terms_iterator.go
Lines 149 to 177 in 812d585
field, pl := fieldIter.Current() | |
if !fti.opts.allow(field) { | |
continue | |
} | |
if fti.restrictByPostings == nil { | |
// No restrictions. | |
fti.current.field = field | |
return true | |
} | |
bitmap, ok := roaring.BitmapFromPostingsList(pl) | |
if !ok { | |
fti.err = errUnpackBitmapFromPostingsList | |
return false | |
} | |
// Check field is part of at least some of the documents we're | |
// restricted to providing results for based on intersection | |
// count. | |
// Note: IntersectionCount is significantly faster than intersecting and | |
// counting results and also does not allocate. | |
if n := fti.restrictByPostings.IntersectionCount(bitmap); n < 1 { | |
// No match, not next result. | |
continue | |
} | |
// Matches, this is next result. | |
fti.current.field = field | |
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we could do this in a followup? Might be too much to add to scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's pretty easy to do now, just 2 different config values.
docs, series := iter.Counts() | ||
b.metrics.queryDocsMatched.RecordValue(float64(docs)) | ||
b.metrics.querySeriesMatched.RecordValue(float64(series)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍
// MaxResultsPerWorkerConfiguration configures the max results per index worker. | ||
type MaxResultsPerWorkerConfiguration struct { | ||
// Fetch is the max for fetch queries. | ||
Fetch int `yaml:"fetch"` | ||
// Aggregate is the max for aggregate queries. | ||
Aggregate int `yaml:"aggregate"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anymore? Seems like it can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than removing MaxResultsPerWorkerConfiguration
@@ -561,6 +528,9 @@ func (b *block) QueryWithIter( | |||
} | |||
} | |||
|
|||
iter.AddSeries(size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ryanhall07 @robskillington Just verifying with you as I read this code multiple times, and I think I found a bug:
size is the number of elements in results
as effectively returned originally here:
func (r *results) AddDocuments(batch []doc.Document) (int, int, error) {
r.Lock()
err := r.addDocumentsBatchWithLock(batch)
size := r.resultsMap.Len()
docsCount := r.totalDocsCount + len(batch)
r.totalDocsCount = docsCount
r.Unlock()
return size, docsCount, err
}
Since results
only keeps growing, as I haven't seen any reset to this object, it means that when you call iter.Counts() to get the seriesMatched, you will get a wrong number, since the AddSeries(size) keeping adding up total and not adding delta. Maybe the correct way is to measure results before adding and after to get that delta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea this looks like a bug.
What this PR does / why we need it:
A new MaxResultsPerPermit option is introduced to cap how many index
results an index worker can process at a time. If the max is exceeded, the
index worker must yield the permit back and acquire it again
(potentially waiting) to continue processing the results.
This cap ensures large queries don't dominate the finite number of index
workers allowed to run concurrently and lock out smaller queries. The
idea is users would want to set the max large enough so the vast
majority of typical queries can finish with only a single permit
acquisition.
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: