-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add cluster namespace fanout heuristics supporting queries greater than retention #908
Add cluster namespace fanout heuristics supporting queries greater than retention #908
Conversation
…greater than retention
src/query/storage/local/storage.go
Outdated
existing.attrs.Resolution <= attrs.Resolution | ||
existsBetter = longerRetention || sameRetentionAndSameOrMoreGranularResolution | ||
default: | ||
panic(fmt.Sprintf("unknown query fanout type: %d", r.queryFanoutType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm could you error instead of panic-ing here?
src/query/storage/local/storage.go
Outdated
@@ -72,26 +227,19 @@ func (s *localStorage) Fetch(ctx context.Context, query *storage.FetchQuery, opt | |||
// cluster that can completely fulfill this range and then prefer the | |||
// highest resolution (most fine grained) results. | |||
// This needs to be optimized, however this is a start. | |||
fanout, namespaces, err := s.resolveClusterNamespacesForQuery(query.Start, query.End) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple thoughts:
- maybe it'd be good to add a way for users to see what data source was hit for each series
- do you want to add tests for the various combinations - just ensuring we override to the right storage based on the heuristics in code
Codecov Report
@@ Coverage Diff @@
## master #908 +/- ##
==========================================
+ Coverage 76.72% 76.75% +0.02%
==========================================
Files 436 436
Lines 37002 37086 +84
==========================================
+ Hits 28391 28464 +73
- Misses 6585 6590 +5
- Partials 2026 2032 +6
Continue to review full report at Codecov.
|
@@ -51,8 +59,16 @@ type m3storage struct { | |||
|
|||
// NewStorage creates a new local m3storage instance. | |||
// TODO: Consider combining readWorkerPool and writeWorkerPool | |||
func NewStorage(clusters Clusters, workerPool pool.ObjectPool, writeWorkerPool xsync.PooledWorkerPool) Storage { | |||
return &m3storage{clusters: clusters, readWorkerPool: workerPool, writeWorkerPool: writeWorkerPool} | |||
func NewStorage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought artem had removed the TODO above in another PR. mind rebasing on master once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rebased, its still there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sigh must have been skipped in the last review, mind deleting it please.
if len(r.seenIters) == 0 { | ||
// store the first attributes seen | ||
r.seenFirstAttrs = attrs | ||
} | ||
r.seenIters = append(r.seenIters, iterators) | ||
if !r.err.Empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be moved up to the first thing after defering the unlock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this still needs to be here, otherwise we won't free/close the iters for successful requests that come back.
I'm adding a note about it so that it's obvious that's why this is below.
iter: iter, | ||
}) | ||
if len(r.seenIters) == 2 { | ||
// need to build backfill the dedupe map from the first result first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe remove the word build
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
i++ | ||
} | ||
|
||
return iter, nil | ||
return r.finalResult, nil | ||
} | ||
|
||
func (r *multiResult) Add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: maybe call the iterators arg newIterators, I got a little lost between seenIters
and iterators
the first time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
serieses[idx] = multiResultSeries{} | ||
var existsBetter bool | ||
switch r.fanout { | ||
case namespacesCoverAllRetention: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this more like namespacesCoverQueryDuration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I can rename this. Probably to namespaceCoversQueryRange
.
case namespacesCoverPartialRetention: | ||
// Already exists and either has longer retention, or the same retention | ||
// and result we are adding is not as precise | ||
longerRetention := existing.attrs.Retention > attrs.Retention |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: existsLongerRetention
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
for idx := range serieses { | ||
serieses[idx] = multiResultSeries{} | ||
var existsBetter bool | ||
switch r.fanout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I feel like this logic would be easier to follow if it was framed from the perspective of determining whether the incoming attributes were better as opposed to vice versa, but that might just be personal preference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I'm kind of preferring to keep the existing one if its there, so that's why I preferably found it better to frame it that way.
One thing I'm realizing now is that there is no concept of how old a namespace is. So say you start with:
Then you decide you want higher resolution so you start dual writing to:
Your queries will immediately begin going to namespace 2 even though it doesn't really have any data. Fixing that is probably out of scope for this P.R but maybe we can track in an issue or something, although I'm not sure what the solution is other than configuring "cutoffs" or something |
@richardartoul yeah, I'm explicitly not tackling that. We can add a readable/writeable flag later to the namespaces - and then manually flip it on once the namespace is filled up. Then eventually we'll get to the all singing, all dancing, dynamic and automated version of this all. |
r.seenIters = nil | ||
|
||
if r.finalResult != nil { | ||
// NB(r): Since all the series iterators in the final result are held onto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind adding a unit test w/ mocks to ensure we free the underlying iterators exactly once assuming the API is used correctly. I think it's 100% right the way it is but the test will be good for when someone comes and attempts to refactors this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, let's revisit this in a followup change.
// need to backfill the dedupe map from the first result first | ||
existing := r.seenIters[0] | ||
r.dedupeMap = make(map[string]multiResultSeries, existing.Len()) | ||
for _, iter := range existing.Iters() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you refactor this loop into func (r *multiResult) addOrUpdateMap(attrs, iters)
- would be re-usable here and in dedupe()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
src/query/storage/m3/storage.go
Outdated
start time.Time, | ||
end time.Time, | ||
) (queryFanoutType, ClusterNamespaces, error) { | ||
now := time.Now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just set a nowFn on *m3Storage and initialize it to time.Now for now, so at least this is slightly easier to work with if someone needs to control it in a test later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
|
||
unaggregated := s.clusters.UnaggregatedClusterNamespace() | ||
unaggregatedRetention := unaggregated.Options().Attributes().Retention | ||
unaggregatedStart := now.Add(-1 * unaggregatedRetention) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you not need to truncate to a blockSize here? I guess thats just kind of an implementation detail of M3DB and not super relevant here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not here thankfully.
src/query/storage/m3/storage.go
Outdated
unaggregated := s.clusters.UnaggregatedClusterNamespace() | ||
unaggregatedRetention := unaggregated.Options().Attributes().Retention | ||
unaggregatedStart := now.Add(-1 * unaggregatedRetention) | ||
if !unaggregatedStart.After(start) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the same as unaggregatedStart.Before(start)
? If so thats easier to reason about to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see you would have to add || .Equal() as well but still seems nicer to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, yeah I did it so you don't have to call twice. It's not a hot path though so I'll refactor to this.
src/query/storage/m3/storage.go
Outdated
return namespacesCoverAllRetention, ClusterNamespaces{unaggregated}, nil | ||
} | ||
|
||
// First determine if any aggregated clusters that can span the whole query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"First determine if any aggregated clusters span the whole query range, if so..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks.
case namespaceCoversPartialQueryRange: | ||
// Already exists and either has longer retention, or the same retention | ||
// and result we are adding is not as precise | ||
existsLongerRetention := existing.attrs.Retention > attrs.Retention |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this is to avoid consolidation... :sigh:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup =]
|
||
downsampleOpts, err := opts.DownsampleOptions() | ||
if err != nil || !downsampleOpts.All { | ||
// Cluster does not contain all data, include as part of fan out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confused about the downsampling thing, how is this different than saying something is unaggregated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically it's aggregated, but data will only go to this cluster if rules are setup to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with the nits
I need to add tests, however I want to get buy-in that this is meets our current needs before moving to writing extensive tests.
This fixes #866.