-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Increase perf for temporal functions #2049
Conversation
FWIW, benchmark results for just the encoded series iterator improvements (top is existing, bottom is new):
|
@@ -174,6 +175,14 @@ func (s *mockStorage) Fetch( | |||
return s.fetchResult.results[idx], s.fetchResult.err | |||
} | |||
|
|||
func (s *mockStorage) FetchProm( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be replaced by mocks when possible
Codecov Report
@@ Coverage Diff @@
## master #2049 +/- ##
========================================
+ Coverage 62.6% 66.4% +3.8%
========================================
Files 996 1009 +13
Lines 86851 86931 +80
========================================
+ Hits 54386 57742 +3356
+ Misses 28192 25326 -2866
+ Partials 4273 3863 -410
Continue to review full report at Codecov.
|
src/query/functions/temporal/base.go
Outdated
batch.Iter, | ||
m, | ||
c.processor, | ||
seriesMeta, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be resultSeriesMeta
src/query/block/column.go
Outdated
// PopulateColumns sets all columns to the given row size. | ||
func (cb ColumnBlockBuilder) PopulateColumns(size int) { | ||
for i := range cb.block.columns { | ||
cb.block.columns[i] = column{Values: make([]float64, size)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth working out the full size and doing a block alloc upfront?
i.e. all := make([]float64, size*len(cb.block.columns))
and then subdividing for each one? That would reduce num of individual allocations, we've seen good speed ups from bulk allocs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat trick!
src/query/api/v1/httpd/handler.go
Outdated
@@ -181,7 +182,9 @@ func (h *Handler) RegisterRoutes() error { | |||
// Wrap requests with response time logging as well as panic recovery. | |||
var ( | |||
wrapped = func(n http.Handler) http.Handler { | |||
return logging.WithResponseTimeAndPanicErrorLogging(n, h.instrumentOpts) | |||
return httputil.CompressionHandler{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Want to add a comment that this does compression under the hood? for all wrapped routes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually also.. do we want want to do this in applyMiddleware(...)
? That means that every single route will get it rather than only those using wrapped(...)
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. It actually doesn't force compression on for all requests; rather it only returns compressed responses provided the request has an Accept-Encoding:gzip
(or Accept-Encoding:deflate
) header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments but otherwise LGTM. Although I don't have a ton of context here haha.
len(values), len(cols)) | ||
} | ||
|
||
rows := len(cols[0].Values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Are the number of rows always the same for every column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this thing is a bit awkward (been meaning to update it), but basically it has a row per series; since the series are expected to be in steps by this point, it's safe to say they're the same size here.
Been meaning to refactor this file (and a bunch of other workflows)
src/query/block/container_test.go
Outdated
@@ -357,13 +357,13 @@ func buildUnconsolidatedSeriesBlock(ctrl *gomock.Controller, | |||
it.EXPECT().Err().Return(nil).AnyTimes() | |||
it.EXPECT().Next().Return(true) | |||
it.EXPECT().Next().Return(false) | |||
vals := make([]ts.Datapoints, numSteps) | |||
vals := make(ts.Datapoints, numSteps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is ts.Datapoints
a slice? Do we need to initialize as vals := make(ts.Datapoints, 0, numSteps)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, ts.Datapoints
is just an alias for []ts.Datapoint
; updated to use append properly
) (storage.PromResult, error) { | ||
stores := filterStores(s.stores, s.fetchFilter, query) | ||
// Optimization for the single store case | ||
if len(stores) == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Does this save alot of perf when we don't enter the loop below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, probably not a huge amount; was more following existing functions
wg.Add(len(stores)) | ||
resultMeta := block.NewResultMetadata() | ||
for _, store := range stores { | ||
store := store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Do we normally use this pattern or pass the variable into the fn as a param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we generally use this approach, haven't seen many places where we've passed it by param
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah pass by param we usually avoid, event though may seem "safer" we use worker pools in a lot of places where you can't pass the variable as a param to the fn (as it takes a func() {}
only to pass to worker pool) so it's better to just get used to doing it one way all the times and not forgetting to take ref for the inner lambda we've found.
return nil, err | ||
} | ||
|
||
samples := make([]prompb.Sample, 0, initRawFetchAllocSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Would pooling stuff like samples and labels make a noticeable diff to perf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we can't really predict the sizes of the slice we need here, we can run into problems where a certain query needs to create huge slices, which get released to the pool but never shrunk back. We could potentially use a pool that has a chance to release and re-alloc smaller slices periodically, but not super sure how big an impact that would have
if it.datapoints == nil { | ||
it.datapoints = make(ts.Datapoints, 0, initBlockReplicaLength) | ||
} else { | ||
it.datapoints = it.datapoints[:0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this be done in some sort of iterator Reset()
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Reset()
is usually used when we want to change the underlying data in an Iterator; here, we're filling up only the data that's returned in Current
and the underlying data stays the same
// metas := make([]SeriesMeta, count) | ||
// for i := range metas { | ||
// metas[i] = SeriesMeta{Name: []byte(fmt.Sprint(i))} | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove these commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, may do it in followup
continue | ||
} | ||
|
||
filtered = append(filtered, l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We could make this a little more performant by:
- start for loop compare each until we hit a label we need to skip
- if we find label we need to skip, copy all into "filtered" up to this element, then keep copying as you go
- if get to end and none were found that need to be skipped, then just take a reference to the original slice
This would avoid memcpying the results back into the original for any results that don't have the label.
But maybe an overoptimization... so only do it if you feel like it's worthwhile. I also realize this only happens if the filter is specified at all.. so maybe not worth doing for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll keep that pattern in mind, but this path is likely cold enough for this to not really matter that much for the extra complexity? If we see this showing up in traces, happy to revisit though
batch, err := bl.MultiSeriesIter(concurrency) | ||
if err != nil { | ||
// NB: do not have to set the iterator error here, since not all | ||
// contained blocks necessarily allow mutli series iteration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Typo "mutli" -> "multi"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, may handle that in the cleanup PR if thats ok
vals := make([]ts.Datapoints, numSteps) | ||
for i := range vals { | ||
vals := make(ts.Datapoints, 0, numSteps) | ||
// for i := range vals { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 removed in cleanup
func (b *ucEmptyBlock) MultiSeriesIter( | ||
concurrency int, | ||
) ([]UnconsolidatedSeriesIterBatch, error) { | ||
batch := make([]UnconsolidatedSeriesIterBatch, concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would it make sense to use append for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cases like this where we're initializing the entire slice to a default value I think this reads a little clearer than doing:
batch := make([]UnconsolidatedSeriesIterBatch, 0, concurrency)
for i := 0; i < concurrency; i++ {
What do you think?
for i, vals := range s.datapoints { | ||
values[i] = consolidationFunc(vals) | ||
for i, v := range s.datapoints { | ||
values[i] = v.Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, are we not performing the consolidation function anymore?
Why is consolidationFunc
passed in here but not used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, so this particular file is not long for this repo... was intending to delete this (and have in the cleanup) and it's (now) on a dead codepath
go func() { | ||
err := buildBlockBatch(loopIndex, batch.Iter, builder, m, p, &mu) | ||
mu.Lock() | ||
// NB: this no-ops if the error is nil. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could skip the mu.Lock()
however if nil. Is that worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, may handle that in the cleanup PR if thats ok
mu.Lock() | ||
// NB: this sets the values internally, so no need to worry about keeping | ||
// a reference to underlying `values`. | ||
err := builder.SetRow(idx, values, blockMeta.seriesMeta[idx]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the lock required if SetRow(...) only touches that row? (which looking at SetRow it looks like it does?)
I know could be potentially dangerous, although could avoid a lot of locking across batches when processing series after series.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just best to keep the locks..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah think it's safer to keep the locks here; applying the actual query is a much heavier process than writing the rows, so any increase is likely negligible
@@ -341,7 +363,8 @@ func getIndices( | |||
l = i | |||
} | |||
|
|||
if !ts.After(rBound) { | |||
if ts <= rBound { | |||
// if !ts.After(rBound) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in cleanup 👍
|
||
for i, v := range sink.Values { | ||
fmt.Println(i, v) | ||
fmt.Println(" ", tt.expected[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need these printlns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in cleanup 👍
// linearRegression performs a least-square linear regression analysis on the | ||
// provided datapoints. It returns the slope, and the intercept value at the | ||
// provided time. | ||
// Uses this algorithm: https://en.wikipedia.org/wiki/Simple_linear_regression. | ||
func linearRegression( | ||
dps ts.Datapoints, | ||
interceptTime time.Time, | ||
interceptTime int64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we use xtime.UnixNano
instead of just pure int64
here and other places? Feel like it will be safer/easier to grok/read too. There's a lot of To/From helpers from that type too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, changed all of these in the upcoming PR
@@ -139,20 +143,18 @@ func standardRateFunc( | |||
counterCorrection float64 | |||
firstVal, lastValue float64 | |||
firstIdx, lastIdx int | |||
firstTS, lastTS time.Time | |||
firstTS, lastTS int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use xtime.UnixNano
here too? https://github.com/m3db/m3/blob/master/src/x/time/unix_nano.go#L27
lBound time.Time, | ||
rBound time.Time, | ||
lBound int64, | ||
rBound int64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use xtime.UnixNano
here too? https://github.com/m3db/m3/blob/master/src/x/time/unix_nano.go#L27
@@ -221,7 +223,8 @@ func irateFunc( | |||
datapoints ts.Datapoints, | |||
isRate bool, | |||
_ bool, | |||
timeSpec transform.TimeSpec, | |||
_ int64, | |||
_ int64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use xtime.UnixNano
here too? https://github.com/m3db/m3/blob/master/src/x/time/unix_nano.go#L27
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, changed all of these in the upcoming PR
} | ||
|
||
result, _, err := accumulator.FinalResultWithAttrs() | ||
defer accumulator.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not defer accumulator.Close()
before the call to FinalResultWithAttrs
since it will run anyway (seems more normal to do a defer foo.Close()
just after you have access to something rather than a few more lines down).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, changed in cleanup 👍
identTag := identTags.Current() | ||
labels = append(labels, prompb.Label{ | ||
Name: identTag.Name.Bytes(), | ||
Value: identTag.Value.Bytes(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these identTags
guaranteed to not be closed until we've returned/used the labels? Are they not pooled at all? If they were pooled I could see just taking raw refs to the bytes causing problems (since could be returned while you hang onto the bytes by the prompb.Labels).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, will update
if err != nil { | ||
// Return the first error that is encountered. | ||
select { | ||
case errorCh <- err: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, seems racey here that errorCh might be already closed?
This is since stopped() is not read or any lock is held to guarantee the errorCh is still open before enqueuing.
Unfortunately even in a switch like this writing to a closed channel will panic:
https://play.golang.org/p/XmRlQbixatB
Can use a cancellable lifetime here if you like:
https://github.com/m3db/m3/blob/master/src/x/resource/lifetime.go#L28:6
As used here:
https://github.com/m3db/m3/blob/master/src/dbnode/storage/index/block.go#L832-L840
@@ -85,8 +92,10 @@ func (it *encodedSeriesIterUnconsolidated) Next() bool { | |||
return false | |||
} | |||
|
|||
alignedValues := values.AlignToBoundsNoWriteForward(it.meta.Bounds, it.lookbackDuration) | |||
it.series = block.NewUnconsolidatedSeries(alignedValues, it.seriesMeta[it.idx]) | |||
it.series = block.NewUnconsolidatedSeries( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need to AlignToBoundsNoWriteForward
anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No; in fact I've managed to remove it in the cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For more context, this was only being used in temporal functions, which immediately unrolled it back into a flat list :P
I've since split consolidated/unconsolidated distinction to be StepIterator = consolidated, SeriesIterator = unconsolidated, as unconsolidated steps or consolidated series don't make a lot of sense
) | ||
|
||
type encodedSeriesIterUnconsolidated struct { | ||
idx int | ||
lookbackDuration time.Duration | ||
err error | ||
meta block.Metadata | ||
datapoints ts.Datapoints | ||
alignedValues []ts.Datapoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is alignedValues
being used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in cleanup 👍
This PR contains improvements across the board for queries, largely focussed on temporal queries:
gzip
ed responses for smaller payloads.UnconsolidatedSeries
now doesn't do consolidation. Shocker! (Before it was aligning points to steps, then instantly unrolling them within temporal functions, allocing 2 unused data structures).int64
s vsTime.Time
s