-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor FetchTagged to return an Iterator of results #3141
Conversation
a4a111f
to
3175ecd
Compare
This is the first step in several refactorings to limit how many series blocks can be loaded at once. This will prevent large queries from overwhelming the system and give more fair access to all queries in the system. This first refactoring creates the interface for callers to use to iterate through series blocks one at a time. The series blocks are still all loaded at once and this will be fixed in a future PR with another Iterator.
be04580
to
2633aec
Compare
Elements: make([]*rpc.FetchTaggedIDResult_, 0, results.Size()), | ||
} | ||
nsIDBytes := ns.Bytes() | ||
tagEncoder := s.pools.tagEncoder.Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robskillington does this make sense for getting a tag encoder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is best way to do it and return at end of request.
} | ||
tagBytes := make([]byte, len(encodedTags.Bytes())) | ||
copy(tagBytes, encodedTags.Bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robskillington had to copy the bytes cause the same tag encoder is used for the entire request.
if err != nil { | ||
return nil, err | ||
} | ||
ctx.RegisterCloser(xresource.SimpleCloserFn(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robskillington does this do what I think it does? call the complete function when the rpc is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this should work just fine 👍
var encodedDataResults [][][]xio.BlockReader | ||
if fetchData { | ||
encodedDataResults = make([][][]xio.BlockReader, results.Size()) | ||
return newFetchTaggedResultsIter(&fetchTaggedResultsIterOpts{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to pass fetchTaggedResultsIterOpts
as a pointer value?
I wouldn't worry about using these values to pass along on the stack considering they already are on the stack in the current function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if you passed each of these one by one in the method call gocritic wouldn't complain... either way I wouldn't worry about growing the stack here, it's much cheaper in general to grow the stack that to heap allocate as a general rule of thumb.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was just avoiding the gocritic linter. so just add nolint:gocritic ?
// HasNextIDIter returns true if there is another series ID to process. | ||
HasNextIDIter() bool | ||
|
||
// NextIDIter returns an iterator to process the results for a series ID. | ||
// HasNextIDIter must be called before each call of this method. | ||
NextIDIter(ctx context.Context) (IDIter, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If you check everywhere else in the code pass our iterators usually use the naming of:
Next() bool // or NextIDIter() for this current example
Current() MyNextResult // or CurrentIDIter() for this current example
Perhaps for consistency better to use Next..() and Current..() instead of HasNext..() and Next..()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can leave java, but java can't leave you
// HasNextSegments returns true if there are more Segments to process. | ||
HasNextSegments() bool | ||
|
||
// NextSegments returns the next Segments. | ||
// HasNextSegments must be called before each call of this method. | ||
NextSegments(ctx context.Context) (*rpc.Segments, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Same about hasnext and next vs rest of codebase with next and current.
tagBytes := make([]byte, len(encodedTags.Bytes())) | ||
copy(tagBytes, encodedTags.Bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is equivalent in speed btw and with compiler optimizations most often comes out better (and @arnikola will attest with the many discussions and benchmarks we've walked through) with the more clean version:
tagBytes := append(make([]byte, 0, len(encodedTags.Bytes())), encodedTags.Bytes()...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO(rhall): don't request all series blocks at once. | ||
if i.idx == 0 && i.fetchData { | ||
for _, idResult := range i.idResults { | ||
id := ident.BytesID(idResult.queryResult.Key()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you copy the old comment to here? Starts with:
// NB(r): Use a bytes ID here so that this ID doesn't need to be...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than minor nits
iter.idResults = append(iter.idResults, &idResult{ | ||
queryResult: &entry, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make both idResult
and index.ResultsMapEntry
not use pointers here?
Both will cause these structs to be individually heap allocated (due to escape analysis unsure of the lifetimes of these two structs).
Using by value and non-pointer types will ensure that (1) idResult in the idResults slice can be just part of the allocation of the slice and (2) index.ResultsMapEntry won't need to be heap allocated and a memcpy can move the struct from the results map to this value.
…o rhall-fetch-results-iterator
@robskillington addressed comments in 40e2322 |
Codecov Report
@@ Coverage Diff @@
## master #3141 +/- ##
=======================================
Coverage 72.2% 72.2%
=======================================
Files 1084 1084
Lines 100236 100279 +43
=======================================
+ Hits 72428 72497 +69
+ Misses 22755 22739 -16
+ Partials 5053 5043 -10
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report at Codecov.
|
if err != nil { | ||
s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) | ||
} else { | ||
s.metrics.fetchTagged.ReportSuccess(s.nowFn().Sub(callStart)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use .ReportSuccessOrError(err, s.nowFn().Sub(callStart))
here and it will do the nil check of the error for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice. just had the code from before.
for iter.Next(ctx) { | ||
if iter.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm usually we have Next(...)
return false if there's an iterator error, then we just check the if err := iter.Err(); err != nil
after the iterator has finished.
i.e.
for iter.Next() { // return false if no more or an error
}
if err := iter.Err(); err != nil {
return nil, err
}
tagBytes := make([]byte, 0) | ||
tagBytes, err = cur.WriteTags(tagBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically if you're not reusing a byte slice being passed into a method taking a dst []byte
to write to we just pass nil
. That way it's not allocated before calling the function (i.e. reduces two allocs to just one alloc inside of the WriteTags(...)
).
e.g.
tagBytes, err := cur.WriteTags(nil)
// ...
for segIter.Next(ctx) { | ||
if segIter.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here would opt to more consistently check for the err after and have the .Next(...)
call return false if there is an error to make it break.
for segIter.Next(ctx) {
// inner
}
if err := segIter.Err(); err != nil {
return nil, err
}
id := ident.BytesID(result.queryResult.Key()) | ||
result.blockReaders, i.err = i.db.ReadEncoded(ctx, i.nsID, id, i.startInclusive, i.endExclusive) | ||
if i.err != nil { | ||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah to be consistent with our other iterators I would make an error return false to break the for loop then allow the caller to check the error after the for loop has broken.
This is consistent with our other iterators (easiest way to find them is to search for for iter.Next() {
or for it.Next() {
and should surface quite a few results).
if err != nil { // This is an invariant, should never happen | ||
return nil, tterrors.NewInternalError(err) | ||
} | ||
result = append(result, encodedTags.Bytes()...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: A slightly more defensive programming approach here is to slice the result
into result[:0]
so that if caller accidently passed a full buffer for reuse that hadn't been resized it will overwrite with the length reset but still able to use the allocated capacity. Also very nitty, we usually the name dst
for a byte slice that is to be written into.
e.g.
func (i *IDResult) WriteTags(dst []byte) ([]byte, error) {
// .... other
dst = append(dst[:0], encodedTags.Bytes()...)
} | ||
|
||
type fetchTaggedResultsIter struct { | ||
queryResults map[index.ResultsMapHash]index.ResultsMapEntry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Why not take the *index.ResultsMap
itself here? Not that this isn't wrong it just breaks the abstraction a little bit of our map type wrapping the underlying map itself (in case we ever wanted to change the way the .Iter()
method worked for the map type).
* master: Refactor FetchTagged to return an Iterator of results (#3141)
* master: [dtest] endpoint to fetch tagged (#3138) Refactor FetchTagged to return an Iterator of results (#3141) [dbnode] Add aggregate term limit regression test (#3135) [DOCS] Adding Prometheus steps to quickstart (#3043) [dbnode] Revert AggregateQuery changes (#3133) Fix TestSessionFetchIDs flaky test (#3132) [dbnode] Alter multi-segments builder to order by size before processing (#3128) [dbnode] Emit aggregate usage metrics (#3123) [dbnode] Add Shard.OpenStreamingReader method (#3119)
This is the first step in several refactorings to limit how many series blocks can be loaded at once. This will prevent large queries from overwhelming the system and give more fair access to all queries in the system. This first refactoring creates the interface for callers to use to iterate through series blocks one at a time. The series blocks are still all loaded at once and this will be fixed in a future PR with another Iterator.
This is the first step in several refactorings to limit how many series blocks can be loaded at once. This will prevent
large queries from overwhelming the system and give fair access to all queries in the system.
This first refactoring creates the interface for callers to use to iterate through series blocks one at a
time. The series blocks are still all loaded at once and this will be fixed in a future PR with another Iterator.
What this PR does / why we need it:
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: