-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Fix AggregateQuery limits #3112
Conversation
type AggregateResultsEntries []AggregateResultsEntry | ||
|
||
// Size is the element size of the aggregated result entries. | ||
func (e AggregateResultsEntries) Size() int { | ||
// NB: add 1 to the entries length for each entry's field. | ||
length := len(e) | ||
for _, entry := range e { | ||
length += len(entry.Terms) | ||
} | ||
|
||
return length | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This'll be reworked to not be inside a generated file (also refactor will help getting around having to do the full length calculation per incoming field)
src/dbnode/storage/index/block.go
Outdated
batch = AggregateResultsEntries(b.opts.AggregateResultsEntryArrayPool().Get()) | ||
maxBatch = cap(batch) | ||
iterClosed = false // tracking whether we need to free the iterator at the end. | ||
exhaustive = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may not need this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah seems like opts.exhaustive(size, resultCount)
accomplishes the same goal
src/dbnode/storage/index/block.go
Outdated
// update recently queried docs to monitor memory. | ||
if results.EnforceLimits() { | ||
if err := b.docsLimit.Inc(len(batch), source); err != nil { | ||
if err := b.docsLimit.Inc(batch.Size(), source); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will have to make a decision on how docs limit is affected in aggregate results; this is not correct
Ah nice, this will make results fairly deterministic I assume? Can we add integration tests for this? I wonder if we also need limit client side for aggregating results across the nodes in |
src/dbnode/storage/index/block.go
Outdated
batch = AggregateResultsEntries(b.opts.AggregateResultsEntryArrayPool().Get()) | ||
maxBatch = cap(batch) | ||
iterClosed = false // tracking whether we need to free the iterator at the end. | ||
exhaustive = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah seems like opts.exhaustive(size, resultCount)
accomplishes the same goal
@@ -123,6 +123,7 @@ func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { | |||
|
|||
func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { | |||
r.Lock() | |||
maxInsertions := r.aggregateOpts.SizeLimit - r.totalDocsCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be subtracting totalDocsCount or r.resultsMap.size()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah going to do some refactoring/renaming around this to make it clearer what each limit is; totalDocsCount is not quite correctly calculated at the moment, so will need to make a few touchups around it
src/dbnode/storage/index/block.go
Outdated
break | ||
} | ||
|
||
field, term := iter.Current() | ||
batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) | ||
if len(batch) < batchSize { | ||
if batch.Size() < maxBatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment on this check?
@@ -714,14 +724,14 @@ func (b *block) aggregateWithSpan( | |||
} | |||
|
|||
// Add last batch to results if remaining. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment could be updated
}) | ||
valueInsertions++ | ||
} else { | ||
// this value exceeds the limit, so should be released to the underling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
underlying
src/dbnode/storage/index.go
Outdated
docResults, ok := results.(index.DocumentResults) | ||
if !ok { // should never happen | ||
state.Lock() | ||
err := fmt.Errorf("unknown results type [%T] received during wide query", results) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what makes this invocation a "wide query"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 should be regular query
@@ -143,159 +140,32 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) | |||
valuesMap := aggValues.Map() | |||
for _, t := range entry.Terms { | |||
if !valuesMap.Contains(t) { | |||
fmt.Println(maxInsertions, valueInsertions, t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG with some questions!
valueInsertions := 0 | ||
defer r.Unlock() | ||
|
||
maxDocs := int(math.MaxInt64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this more like, remaining docs we can load? remainingDocs
?
|
||
// NB: already hit doc limit. | ||
if maxDocs <= 0 { | ||
for idx := 0; idx < len(batch); idx++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so since we hit the limit, we essentially have to clean up this entire batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, exactly
limitTripped = true | ||
} | ||
|
||
if limitTripped { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we in-line the check above and remove the limitTripped
var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call
return err | ||
valuesMap := aggValues.Map() | ||
for _, t := range entry.Terms { | ||
if maxDocs > docs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the max is already hit should we break from the for
? Or is it a trivial amount of iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still have to iterate through the remainder here to finalize them
No description provided.