-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dbnode Aggregate RPC implementation #1483
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the interfaces in general, but still a bit confused about how the filter function will work. Didn't really look into the remainder of the code very much, since it's still WIP; if some of it is good to go gimme a message 👍
) | ||
|
||
var ( | ||
aggregateAttemptArgsZeroed aggregateAttemptArgs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there any risk of issues from this being mutable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nah, this is a common pattern we use all over this package. we only use it to set values to zero without an extra alloc on the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like this pattern
Only asking coz got cautioned against doing this in a previous PR I did :p
src/dbnode/client/types.go
Outdated
@@ -78,6 +78,9 @@ type Session interface { | |||
// FetchTaggedIDs resolves the provided query to known IDs. | |||
FetchTaggedIDs(namespace ident.ID, q index.Query, opts index.QueryOptions) (iter TaggedIDsIterator, exhaustive bool, err error) | |||
|
|||
// Aggregate aggregates values from the database for the given set of contraints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: constraints
At what level in the stack do we start referring to this as a TagCompletion
query or something similar v.s. an Aggregate?
src/dbnode/storage/index/types.go
Outdated
} | ||
|
||
// TagNameFilterFn provides a predicate using which aggregation results can be filtered. | ||
type TagNameFilterFn func(tagName []byte) bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering how would we go about serializing this function to send to the index? Unless you're thinking we'd do the filtering in the client side rather than the index side?
src/dbnode/client/types.go
Outdated
@@ -90,6 +93,22 @@ type Session interface { | |||
Close() error | |||
} | |||
|
|||
// AggregatedTagsIterator iterates over a collection of tag names and associated tag values. | |||
type AggregatedTagsIterator interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return type looks nice, I like it 👍
src/dbnode/client/types.go
Outdated
|
||
// Err returns any error encountered. | ||
Err() error | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to add a Remaining() int
method
6e42d05
to
0448b1c
Compare
Codecov Report
@@ Coverage Diff @@
## master #1483 +/- ##
========================================
- Coverage 71.3% 70.9% -0.4%
========================================
Files 852 852
Lines 74159 72799 -1360
========================================
- Hits 52908 51687 -1221
+ Misses 17867 17731 -136
+ Partials 3384 3381 -3
Continue to review full report at Codecov.
|
8993097
to
0b20cdc
Compare
4f0cca2
to
8a163c4
Compare
|
||
func (f *aggregateAttempt) reset() { | ||
f.args = aggregateAttemptArgsZeroed | ||
f.resultIter = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to finalize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, the caller takes ownership of the result iter after the attempt is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nah these *Attempt objects have much simpler lifecycles so the pooling is done manually in session.go
. e..g
func (s *session) Aggregate(
ns ident.ID, q index.Query, opts index.AggregationOptions,
) (AggregatedTagsIterator, bool, error) {
f := s.pools.aggregateAttempt.Get()
f.args.ns = ns
f.args.query = q
f.args.opts = opts
err := s.fetchRetrier.Attempt(f.attemptFn)
iter, exhaustive := f.resultIter, f.resultExhaustive
s.pools.aggregateAttempt.Put(f)
return iter, exhaustive, err
}
return err | ||
} | ||
|
||
type aggregateAttemptPool interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be an interface? Looking at some of the other _attempt
files, they mostly just use the struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches the other attempt pools:
type fetchTaggedAttemptPool interface { ... }
So for consistency at least for the attempt pools let's just keep these as interfaces for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aye. it's an interface so we can replace it with leakcheck versions of the pool during tests (the fetch-tagged-attempt pool does the same thing). these are both newer than the other codepaths, at somepoint we'd add this type of tests to the other *attempts too
) | ||
|
||
var ( | ||
aggregateAttemptArgsZeroed aggregateAttemptArgs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like this pattern
Only asking coz got cautioned against doing this in a previous PR I did :p
src/dbnode/client/fetch_state.go
Outdated
done = true | ||
err = fmt.Errorf( | ||
"[invariant violated] expected result to be one of %v, received: %v", | ||
[]string{"fetchTaggedResultAccumulatorOpts"}, result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a c/p error: "fetchTaggedResultAccumulatorOpts"
-> "fetchTaggedFetchState", "aggregateFetchState"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, good catch thanks.
return f.tagResultAccumulator.AsEncodingSeriesIterators(limit, pools) | ||
} | ||
|
||
func (f *fetchState) asAggregatedTagsIterator(pools fetchTaggedPools) (AggregatedTagsIterator, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since we added a result type, maybe this and asTaggedIDsIterator
should check that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm how bout testing f.stateType
in all three as...()
methods?
@@ -26,6 +26,8 @@ import ( | |||
"sort" | |||
"time" | |||
|
|||
"github.com/m3db/m3x/ident" | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra newline, linter may complain here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup good call, thanks.
type tagValueSeen bool | ||
|
||
type verifyQueryAggregateMetadataResultsOptions struct { | ||
exhausitive bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: exhausitive
-> exhaustive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
func verifyQueryAggregateMetadataResults( | ||
t *testing.T, | ||
iter client.AggregatedTagsIterator, | ||
exhausitive bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: exhausitive
-> exhaustive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
for name, values := range opts.expected { | ||
expected[name] = map[tagValue]tagValueSeen{} | ||
for value := range values { | ||
expected[name][value] = tagValueSeen(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like this idea a lot (was using dumb stuff with delete(..)
when doing similar tests)
} | ||
} | ||
|
||
assert.Equal(t, len(expected), compared, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might be better as assert.Equal(t, 0, len(notMatched), ...)
and then we can drop the compared
var
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
"seattle": struct{}{}, | ||
}, | ||
}, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add this test for filtering:
// Match all new_*r*, filtering on `foo`
regexpQuery, err = idx.NewRegexpQuery([]byte("city"), []byte("new_.*r.*"))
require.NoError(t, err)
queryOpts.TermFilter = index.AggregateTermFilter([][]byte{[]byte("foo")})
iter, exhausitive, err = session.Aggregate(ns1.ID(),
index.Query{regexpQuery}, queryOpts)
require.NoError(t, err)
require.True(t, exhausitive)
defer iter.Finalize()
verifyQueryAggregateMetadataResults(t, iter, exhausitive,
verifyQueryAggregateMetadataResultsOptions{
exhausitive: true,
expected: map[tagName]aggregateTagValues{
"foo": aggregateTagValues{
"foo": struct{}{},
},
},
})
// Match all new_*r*, names only, filtering on `city`
regexpQuery, err = idx.NewRegexpQuery([]byte("city"), []byte("new_.*r.*"))
require.NoError(t, err)
queryOpts.TermFilter = index.AggregateTermFilter([][]byte{[]byte("city")})
queryOpts.Type = index.AggregateTagNames
iter, exhausitive, err = session.Aggregate(ns1.ID(),
index.Query{regexpQuery}, queryOpts)
require.NoError(t, err)
require.True(t, exhausitive)
defer iter.Finalize()
verifyQueryAggregateMetadataResults(t, iter, exhausitive,
verifyQueryAggregateMetadataResultsOptions{
exhausitive: true,
expected: map[tagName]aggregateTagValues{
"city": nil,
},
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TY for writing this, good call.
|
||
// sort to order results to make test deterministic. | ||
sort.Slice(r.Results, func(i, j int) bool { | ||
return bytes.Compare(r.Results[i].TagName, r.Results[j].TagName) < 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, should start using this (mostly just make a throwaway sort.Sort
type haha)
@@ -359,6 +500,10 @@ func (w *writeTaggedIter) Duplicate() ident.TagIterator { | |||
|
|||
// FromRPCQuery will create a m3ninx index query from an RPC query | |||
func FromRPCQuery(query *rpc.Query) (idx.Query, error) { | |||
if query == nil { | |||
return idx.NewAllQuery(), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice 👍
Might be worth calling out in the comment though?
// FromRPCQuery will create a m3ninx index query from an RPC query.
//
// NB: a nil query is considered equivalent to an `All` query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
// TODO(prateek): pool the []byte underlying opts.TermFilter | ||
filters := make([][]byte, 0, len(opts.TermFilter)) | ||
for _, f := range opts.TermFilter { | ||
copied := append([]byte(nil), f...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More for my own benefit, but how does this compare to:
copied := make([]byte, len(f))
copy(copied, f)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's optimized to be just as good by the compiler these days thankfully, and it's a one liner =]
} | ||
tagValues := entry.Value() | ||
tagValuesMap := tagValues.Map() | ||
responseElem.TagValues = make([]*rpc.AggregateQueryResultTagValueElement, 0, tagValuesMap.Len()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meganit: may be nicer to use indexed insertion here rather than appending?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appending is always safer, heh.
@@ -359,6 +359,24 @@ func (ts testSerieses) assertMatchesEncodingIters(t *testing.T, iters encoding.S | |||
} | |||
} | |||
|
|||
func (ts testSerieses) assertMatchesAggregatedTagsIter(t *testing.T, iters AggregatedTagsIterator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// of the hostQueues responding is less than the lifecycle of the current method. | ||
nsClone := s.pools.id.Clone(ns) | ||
|
||
// FOLLOWUP(prateek): currently both `index.Query` and the returned request depend on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add these callouts to that issue so they don't fall through the cracks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
src/dbnode/client/session.go
Outdated
case fetchTaggedFetchState: | ||
fetchOp := s.pools.fetchTaggedOp.Get() | ||
fetchOp.incRef() // indicate current go-routine has a reference to the op | ||
closer = func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: closer = fetchOp.decRef
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
src/dbnode/client/session.go
Outdated
case aggregateFetchState: | ||
aggOp := s.pools.aggregateOp.Get() | ||
aggOp.incRef() // indicate current go-routine has a reference to the op | ||
closer = func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: closer = aggOp.decRef()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
src/dbnode/client/session.go
Outdated
return nil, wrappedErr | ||
} | ||
} | ||
|
||
op.decRef() // release the ref for the current go-routine | ||
closer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: re-add the // release the ref for the current go-routine
comment here and above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed.
return false | ||
} | ||
i.currentIdx++ | ||
if i.currentIdx >= len(i.backing) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we release/reset i.current
here too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
return tag | ||
} | ||
|
||
func (i *aggregateTagsIterator) Finalize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we finalize i.current
if it exists here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
if i.currentIdx < 0 { | ||
return len(i.backing) | ||
} | ||
return len(i.backing) - i.currentIdx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm this has an off by one. could replace entire function with return len(i.backing) - (i.currentIdx+1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
err error | ||
) | ||
switch r := result.(type) { | ||
case fetchTaggedResultAccumulatorOpts: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also test that f.stateType == fetchTaggedFetchState
here (just paranoia)
switch r := result.(type) { | ||
case fetchTaggedResultAccumulatorOpts: | ||
done, err = f.tagResultAccumulator.AddFetchTaggedResponse(r, resultErr) | ||
case aggregateResultAccumulatorOpts: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too
responses fetchTaggedIDResults | ||
exhaustive bool | ||
errors xerrors.Errors | ||
responses fetchTaggedIDResults |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: mind renaming var to fetchResponses
or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
|
||
accum.aggResponses = aggregateResults(results) | ||
accum.aggResponses.forEachTag(func(elems aggregateResults, hasMore bool) bool { | ||
tagResult := iter.addTag(elems[0].TagName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just checking my understanding: we're guaranteed elems[0]
exists because we'd only call the lambda from the forEachTag implementation if the group was non-empty yea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct yeah.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a note.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
No description provided.