-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Aggregate() using only FSTs where possible #1545
Conversation
prateek
commented
Apr 11, 2019
•
edited
Loading
edited
- When we're not filtering on a query, we can compose the results of Aggregation from the FSTs directly. This avoids the code path from postings lists -> documents, thereby saving a lot of CPU.
fc5db8f
to
578baac
Compare
field, term []byte, | ||
includeTerms bool, | ||
) []AggregateResultsEntry { | ||
// NB(prateek): we make a copy of the (field, term) entries returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
highlight this bit as a potential contentious choice. could simplify to living with higher contention if people feel strongly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach fwiw :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aye, me too.
aggValues = r.valuesPool.Get() | ||
// we can avoid the copy because we assume ownership of the passed ident.ID, | ||
// but still need to finalize it. | ||
r.resultsMap.set(f, aggValues, _AggregateResultsMapKeyOptions{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using an internal method of codegen'd type feels sketch but we don't have the equivalent exported so no way around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this use
SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
})
to avoid the internal method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, will do.
Codecov Report
@@ Coverage Diff @@
## master #1545 +/- ##
========================================
- Coverage 71.7% 71.6% -0.2%
========================================
Files 947 948 +1
Lines 77981 78230 +249
========================================
+ Hits 55960 56026 +66
- Misses 18381 18548 +167
- Partials 3640 3656 +16
Continue to review full report at Codecov.
|
b143a0b
to
501c20d
Compare
src/dbnode/storage/index/types.go
Outdated
@@ -181,6 +181,16 @@ type AggregateResults interface { | |||
aggregateQueryOpts AggregateResultsOptions, | |||
) | |||
|
|||
// AggregateResultsOptions returns the set AggregateResultsOptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returns the options for this AggregateResult
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
src/dbnode/storage/index/types.go
Outdated
opts QueryOptions, | ||
results AggregateResults, | ||
) (exhaustive bool, err error) | ||
|
||
// AddResults adds bootstrap results to the block, if c. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's not part of this pr but can you fix this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/dbnode/storage/index/types.go
Outdated
@@ -273,6 +290,16 @@ type Block interface { | |||
results BaseResults, | |||
) (exhaustive bool, err error) | |||
|
|||
// Aggregate aggregates known tag names/values. | |||
// NB(prateek): this is different from Query, as we can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; different from AggregateQuery
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, methods on this interface are Query()
and Aggregate()
. Unless you mean to say rename?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
different from aggregating by means of Query
maybe? Otherwise it kinda reads like this aggregates tag names/values which you don't really expect Query to do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol sure
@@ -304,6 +322,16 @@ func (o *opts) DocumentArrayPool() doc.DocumentArrayPool { | |||
return o.docArrayPool | |||
} | |||
|
|||
func (o *opts) SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options { | |||
opts := *o | |||
opts.aggResultsEntryArrayPool = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this pool be added to the validate function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nvm looks like that's only for pools which need to be added rather than the ones created by NewOptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meh, i'll add for sanity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
aggValues = r.valuesPool.Get() | ||
// we can avoid the copy because we assume ownership of the passed ident.ID, | ||
// but still need to finalize it. | ||
r.resultsMap.set(f, aggValues, _AggregateResultsMapKeyOptions{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this use
SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
})
to avoid the internal method?
return false, err | ||
} | ||
|
||
if err := iter.Close(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think this is necessary; you reset every loop and close the iterator explicitly below already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm it's necessary cause we want to ensure we're releasing any internal state (via Close()). Reset() just clears that state, doesn't release it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair; in that case it's a little weird that we double close below
For this particular iterator it's not really a problem, but it's plausible for an iter implementatiopn to panic if it's double closed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely a fair point. This implementation doesn't have that issue intentionally for this reason - so as to ensure we free resources. I'll make a note about this property in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still feels a little sketch to me because I think some of our Close()
methods return objects to pools
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a fair point. I'll rework to not rely on this behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
// Add last batch to results if remaining. | ||
if len(batch) > 0 { | ||
batch, size, err = b.addAggregateResults(cancellable, results, batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to append aggregate results too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nah, cause we're already done that for the elements in the batch in a previous loop iteration.
field, term []byte, | ||
includeTerms bool, | ||
) []AggregateResultsEntry { | ||
// NB(prateek): we make a copy of the (field, term) entries returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach fwiw :)
var ( | ||
entry AggregateResultsEntry | ||
lastField []byte | ||
lastFieldIsValid bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think this is necessary, bytes.Equal
should handle the lastField == nil
case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm not sure we can. Consider the case when we're trying to distinguish the first element of a batch (i.e. no prior elements in the batch), from a batch with the last element having a nil field. I don't think it'll happen in practice but this way makes fewer assumptions about the data so tend to prefer it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the nil field will break this in general even if it's somewhere in the middle (since you'd be trying to call .Bytes()
on it); otherwise is there any real difference between?
Could do something like
if len(batch) > 0 {
last := batch[len(batch)-1]
if last != nil {
lastField = last.Bytes()
}
}
if bytes.Equal(lastField, field) {
entry = batch[len(batch)-1]
} else {
entry.Field = b.pooledID(field)
}
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're guaranteed it'd only be the first element (cause its the first lexicographic string). It wouldn't break anything cause we'd still allocate an ident.ID backed by an empty slice for it.
src/dbnode/storage/index/block.go
Outdated
return exhaustive, nil | ||
} | ||
|
||
func (b *block) appendAggregateResults( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the name was really confusing (as was addAggregateResults
followed by addAggregateResults
), maybe rename this to appendFieldAndTermToBatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure thing
|
||
func (fti *fieldsAndTermsIter) setNext() bool { | ||
// if only need to iterate fields | ||
if !fti.opts.iterateTerms { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this might flow a little better if the branch is moved to the Next()
function instead of calling setNext
then instantly calling setNextField
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair, done.
return false | ||
} | ||
fti.termIter = termsIter | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this may need to check if !fti.opts.allow(field)
and move onto the next fieldIter if so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, cause setNextField()
does that check already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 misread that this was calling Next
instead
slice := toSlice(t, iter) | ||
requireSlicesEqual(t, []pair{}, slice) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a few tests with allowFn
set, maybe one which goes through a couple of segments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: allowFn
tests - does the existing TestFieldsTermsIteratorSimpleSkip
not capture stuff you're worried about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: multiple segments, do you mean Reset()? if so, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, re allowFn
- the prop tests using fieldsTermsIteratorPropInput
are generating test situations with those cases too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad yeah, was looking at a partial diff so the tests seemed lacking
313d21f
to
39a94c1
Compare
src/dbnode/storage/index.go
Outdated
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions) | ||
// use appropriate fn to query underlying blocks. | ||
fn := i.execBlockQueryFn | ||
if query.Equal(idx.NewAllQuery()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just compare against a global one? Its miniscule but technically this allocates because it creates an interface internally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol it's not going to have an impact on perf but sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/dbnode/storage/index.go
Outdated
return | ||
} | ||
|
||
if blockExhaustive { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be simpler to just do:
state.exhaustive = blockExhaustive
return
Would make adding more logic later easier, although technically it might be slower cause you're forcing a memory write back 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setting to state.exhaustive = state.exhaustive && blockExhaustive
always works so doing that instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/dbnode/storage/index.go
Outdated
return | ||
} | ||
|
||
if blockExhaustive { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
f := entry.Field | ||
aggValues, ok := r.resultsMap.Get(f) | ||
if !ok { | ||
aggValues = r.valuesPool.Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do these get reset? I don't see it after any Get() calls nor in the generated code on put. Are we just trusting that they've been properly reset here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way it's supposed to work: we register a Finalizer on the incoming context to ensure the Finalize() method on the object is called (which in turn calls Reset(nil, ...)
which does the actual releasing).
That said, the current code doesn't look to be registering a Finalizer on the context in either Query/Aggregate. Will put up another PR for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#1567 as the follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it cheap to do a reset after pulling out the pool? I generally prefer that pattern over trust that every Put does a proper reset but I trust you understand this lifecycle better than I do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah Reset is O(items in the map); so should be free considering the Put is doing the right thing. Don't need to do it tho, we ensure to cleanup before the Put
I prefer this approach (assume object returned from pool is valid) cause it puts the cleanup penalty on the last callsite to use the object (as opposed to the new callsite to receive the object). Seems like a "fairer" way to tax users
return r.aggregateOpts | ||
} | ||
|
||
func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the error just future proofing the API? Doesnt look like its currently possible to return one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair, will simplify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
data := b.bytesPool.Get(len(id)) | ||
data.IncRef() | ||
data.AppendAll(id) | ||
data.DecRef() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, seems like this ID will have no refs for awhile until we add it to the results :/ Nothing we can do about that I assume? adding more accounting probably not worth it since these are really expensive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ref is only the bytes backing this ident, which the ident will take a ref to right after this line when we transfer the bytes to it (https://github.com/m3db/m3/blob/master/src/x/ident/identifier_pool.go#L105)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh duh I misunderstood this. 👍
src/dbnode/storage/index/block.go
Outdated
results AggregateResults, | ||
batch []AggregateResultsEntry, | ||
) ([]AggregateResultsEntry, int, error) { | ||
// Checkout the lifetime of the query before adding results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: period at the end of all these comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
parameters.Rng = rand.New(rand.NewSource(seed)) | ||
properties := gopter.NewProperties(parameters) | ||
|
||
properties.Property("Fields Terms Iteration doesn't blow up", prop.ForAll( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment about the motivation for why we need a separate test just to make sure there are no panics when we already have a correctness test (that presumably might also catch panics)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tl;dr - correctness prop test ensures we behave correctly on the happy path. this prop tests ensures we don't panic unless the underlying itself iterator panics.
will add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
itself
-> iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
fieldsAndTermsIterZeroed fieldsAndTermsIter | ||
) | ||
|
||
var _ fieldsAndTermsIterator = &fieldsAndTermsIter{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this only happens at package initialization anyways right? who cares haha
// Err returns any errors encountered during iteration. | ||
Err() error | ||
|
||
// Close releases any resources held by the iterator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment explicitly saying this will not return the iter to the pool and that anything implementing this interface should explicitly support double closes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned earlier, reworked to avoid this dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:stamp: to unblock
b8fb0fc
to
2ce7b21
Compare
22dd0c9
to
c4fb8f4
Compare
f := entry.Field | ||
aggValues, ok := r.resultsMap.Get(f) | ||
if !ok { | ||
aggValues = r.valuesPool.Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it cheap to do a reset after pulling out the pool? I generally prefer that pattern over trust that every Put does a proper reset but I trust you understand this lifecycle better than I do
@@ -823,30 +852,235 @@ func (b *block) addQueryResults( | |||
results BaseResults, | |||
batch []doc.Document, | |||
) ([]doc.Document, int, error) { | |||
// Checkout the lifetime of the query before adding results | |||
// checkout the lifetime of the query before adding results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you lower casing these? I've been commenting on all P.Rs to ensure they're capital!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh really? I've been trying to stay all lower case for a while. Don't feel strongly about which one but we should pick a convention and stick to it.
size = results.Size() | ||
batch = b.opts.AggregateResultsEntryArrayPool().Get() | ||
batchSize = cap(batch) | ||
iterClosed = false // tracking whether we need to free the iterator at the end. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just a guard against future maintainers? Cause it seems like you always close it now. If so maybe just mention in the comment its an extra precaution so people dont spend time chasing why its needed (unless I missed something)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nah so it's like the standard cleanup pattern we use in a few places
object := ...
cleanedUp := false
defer func() {
if !cleanedUp {
object.Cleanup()
}
}()
// do thing 1 for object
// if it fails, just early `return`
// so on
// finally,
cleanedUp = true
if err :=object.Cleanup(); err != nil {
return err
}
this way we can be sure all exit paths from the function cleanup the object; either because we manually do it; or the defer takes care of it.
without the defer (and the var). i'd have to interlace the cleanup calls at every exit point from the function. which seems brittle at best.
data := b.bytesPool.Get(len(id)) | ||
data.IncRef() | ||
data.AppendAll(id) | ||
data.DecRef() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh duh I misunderstood this. 👍
parameters.Rng = rand.New(rand.NewSource(seed)) | ||
properties := gopter.NewProperties(parameters) | ||
|
||
properties.Property("Fields Terms Iteration doesn't blow up", prop.ForAll( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
itself
-> iterator
e13bfc0
to
40070bc
Compare
40070bc
to
a42a09c
Compare