-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add downsampling capabilities for coordinator #796
Add downsampling capabilities for coordinator #796
Conversation
Codecov Report
@@ Coverage Diff @@
## master #796 +/- ##
==========================================
- Coverage 77.91% 77.63% -0.28%
==========================================
Files 368 374 +6
Lines 31799 32418 +619
==========================================
+ Hits 24775 25167 +392
- Misses 5342 5537 +195
- Partials 1682 1714 +32
Continue to review full report at Codecov.
|
|
||
numRollups := matchResult.NumRollups() | ||
for i := 0; i < numRollups; i++ { | ||
rollup, ok := matchResult.RollupsAt(i, now.UnixNano()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Better to flip this and add samplesAppender if ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
id := a.encodedTagsIteratorPool.Get() | ||
id.Reset(unownedID) | ||
now := time.Now() | ||
fromNanos, toNanos := now.Add(-1*a.clockOpts.MaxNegativeSkew()).UnixNano(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be easier to read as��
fromNanos := now.Sub(a.clockOpts.MaxNegativeSkew()).UnixNano()
toNanos := now.Add(1*a.clockOpts.MaxPositiveSkew()).UnixNano()�
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
|
||
func (a *metricsAppender) SamplesAppender() (SamplesAppender, error) { | ||
// Sort tags | ||
sort.Sort(a.tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to insert the tags in the sorted location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That requires an alloc, heh - I'm going out of my way to avoid the alloc for each time this needs to happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't you appending new tags anyway? Would putting it in the right place require an additional alloc instead?
func (w *downsamplerFlushHandlerWriter) Write( | ||
mp aggregated.ChunkedMetricWithStoragePolicy, | ||
) error { | ||
w.wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little odd to me to have this function touching the internal wait group; what's the difference between doing it this way and calling Flush at the end v.s. the calling function handling the parallelism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So accruing the outputs with Write then actually asking the storage to write the samples out when Flush is called would add a large amount of latency to the whole process, you ideally want to start writing as soon as possible.
The whole Write and Flush interleaving is much better for pipe type of communication, like TCP, which the aggregator interfaces seemed to have been optimized for unfortunately.
So this is why I am complying with the interfaces but ensuring that we can reduce latency and pending requests by writing immediately, and just using Flush to synchronize with the caller.
}, nil | ||
} | ||
|
||
func (h *downsamplerFlushHandler) Close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we get a comment here? Either // noop
or // TODO
depending on what's required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, I'll add no-op.
|
||
type encodedTagsIteratorPool struct { | ||
tagDecoderPool serialize.TagDecoderPool | ||
pool pool.ObjectPool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth adding a xpool.CheckedBytesWrapperPool
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, doesn't seem like it's needed anywhere currently?
|
||
// TagValue returns the value for a tag value. | ||
func (it *encodedTagsIterator) TagValue(tagName []byte) ([]byte, bool) { | ||
it.tagDecoder.Reset(it.bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this clone the tagIterator first, then reset that? If I'm reading this right if you ever call TagValue, it'll mess with the current iterator position, and if you were trying to get value for a missing name, you'd use up the iterator?
Would it be worth it to decode the tagDecoder to a list of tags on Reset for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair, I'll change it to clone.
) *encodedTagsIterator { | ||
return &encodedTagsIterator{ | ||
tagDecoder: tagDecoder, | ||
bytes: checked.NewBytes(nil, nil), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth getting this from a pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since encodedTagsIterator
itself is already pooled this is only paid when a single encodedTagsIterator
is created in the pool so I think it would be the same cost to also pool the checked.NewBytes(...)
because the entire encodedTagsIterator
will be reused anyhow, hence we don't allocate checked.NewBytes(...)
over and over.
} | ||
|
||
// Validate will validate the dynamic downsampling options. | ||
func (o DownsamplerOptions) Validate() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should Validate() have the responsibility of checking that o.StorageFlushConcurrency > 0
, and setting it to default if not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Validate()
needs to be public? seems it's only called internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, I'll remove visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: validate steps should never mutate something, hence I'm pretty against mutating during validate.
namespaces = clusters.ClusterNamespaces() | ||
downsampler downsample.Downsampler | ||
) | ||
if n := namespaces.NumAggregatedClusterNamespaces(); n > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Pull this out into a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this is a function NumAggregatedClusterNamespaces()
? Or do you mean like AnyAggregatedClusterNamespaces()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, meant the whole block that builds the downsampler not specifically this line haha
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, done.
initAllocTagsSliceCapacity = 32 | ||
shardSetID = uint32(0) | ||
instanceID = "downsampler_local" | ||
placementKVKey = "/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is this the convention used in this repo? Usually we don't have the /
prefix in keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is directly from the aggregator.
} | ||
|
||
// MetricsAppender is a metrics appender that can | ||
// build a samples appender, only valid to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should only valid to use with a single caller at a time.
be a comment on the implementation instead? Will there ever be a thread safe implementation for this?
} | ||
|
||
// Validate will validate the dynamic downsampling options. | ||
func (o DownsamplerOptions) Validate() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Validate()
needs to be public? seems it's only called internally?
|
||
type newAggregatorResult struct { | ||
aggregator aggregator.Aggregator | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove empty lines in the struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
shardSet[i] = shard.NewShard(uint32(i)). | ||
SetState(shard.Initializing). | ||
SetCutoverNanos(0). | ||
SetCutoffNanos(math.MaxInt64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to set Cutover
and Cutoff
nanos? Default value should be good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, can leave as default.
return newAggregatorResult{}, err | ||
} | ||
|
||
placementStore := mem.NewStore() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be cleaner to create a placement.Service with the mem.Store
to init the placement with this instance and the shardIDs with one replica rather than creating the placement and save proto etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this, it was too difficult to create a placement service from mem.Store
. Also you can't create a services.LeaderService
without a concrete etcd client which means can't use that approach.
return | ||
} | ||
|
||
err = w.handler.storage.Write(w.ctx, &storage.WriteQuery{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want any retry here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe for this we'll allow the M3DB session client to retry the write (it has a configurable write retrier), just so there is one way to do it. We do it at a higher level in the ingester but only because we want to know when we're performing retries, etc with metrics.
return nil | ||
} | ||
|
||
type newAggregatorResult struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name seems a bit odd, should this be aggregator
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to be something other than aggregator
otherwise it'll shadow the import. I can use just agg
perhaps.
) | ||
|
||
// Ensure encodedTagsIterator implements id.SortedTagIterator | ||
var _ id.SortedTagIterator = &encodedTagsIterator{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is encodedTagsIterator
implementing more than just the id.SortedTagIterator
interface? If so can you add some comments to the struct? Otherwise just return id.SortedTagIterator
from the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create an interface that implements both interfaces id.SortedTagIterator
and id.ID
.
} | ||
} | ||
if p.rollupTagIndex == -1 { | ||
p.rollupTagIndex = len(p.tagPairs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to treat the rollup tag differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because its not part of the tag pairs, the rollup ID provider actually wraps tag pairs and when enumerated gives you back the rollup tag at the right spot. It's to save re-allocing tag pairs each time when you need to insert the tag, you can just get a pooled rollup ID provider, set the tag pairs and then give it to another component that can iterate them and get back the rollup tag at the right position when enumerating the tags.
} | ||
|
||
func (t *tags) Current() ident.Tag { | ||
t.nameBuf = append(t.nameBuf[:0], t.names[t.idx]...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copying the bytes? Do you need the copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually turning a string t.names[...]
into bytes, doing so by using a re-useable buffer so its cheap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need to add some tests for at least the basic use cases.
|
||
flushWorkers := xsync.NewWorkerPool(storageFlushConcurrency) | ||
flushWorkers.Init() | ||
handler := newDownsamplerFlushHandler(o.Storage, sortedTagIteratorPool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I agree, we're initializing pools, rollups, setting up placements, etc. Maybe split them into smaller functions ?
err := iter.Err() | ||
iter.Close() | ||
if err != nil { | ||
logger.Debugf("downsampler flush error preparing write: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we don't return back the error, maybe make these logger.Error ? Otherwise, we would probably lose these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
}, | ||
}) | ||
if err != nil { | ||
logger.Debugf("downsampler flush error failed write: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above, consider logger.Error ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
} | ||
|
||
func (t *tags) Duplicate() ident.TagIterator { | ||
return &tags{idx: -1, names: t.names, values: t.values} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is duplicate meant to clone ? If yes, you will probably have to copy the slices ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not meant to clone, it's just meant to be a duplicate iterator with the position reset.
downsampler downsample.Downsampler, | ||
scope tally.Scope, | ||
) (http.Handler, error) { | ||
if store == nil && downsampler == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious about this condition. Traditionally, we don't check if arguments passed to a struct are null except if they are coming from a config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to be defensive.
"github.com/m3db/m3x/ident" | ||
) | ||
|
||
type tags struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea to unit test the Current() & Next() usage ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, adding coverage.
} | ||
|
||
func (t *tags) Next() bool { | ||
hasNext := t.idx+1 < len(t.names) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like other functions in this package are appending to t.names. In that case, Next() can return false first but then return true which makes it a bit confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're meant to construct it first, then use it as an iterator. I don't want to create intermediate structures or else they can't be pooled easily.
writeUnaggregatedErr error | ||
writeAggregatedErr error | ||
) | ||
if h.downsampler != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this block can be another function ? Similarly, h.store != nil block can be another function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, good call. Done.
request := newLocalWriteRequest(write, h.store) | ||
requests = append(requests, request) | ||
} | ||
writeUnaggregatedErr = execution.ExecuteParallel(ctx, requests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like we are using two very different ways of writing aggregated vs unaggregated (multiErr vs failing on first error). If you think multiErr is better, then let's switch to that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to clarify: my worry here is that for downsampled metrics we would end up writing all endpoints, ignoring any intermediate errors but for unaggregated metrics, we will fail on first error. I think that behavior should be consistent ? Maybe consider hiding the actual downsampled and unaggregated writes behind another object instead of putting this in remote/write.go ? We also have native/write.go and you don't want to replicate the logic in two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I've refactored this to always try and write every incoming value regardless of how many errors we encounter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were you planning on adding some coverage ?
|
||
flushWorkers := xsync.NewWorkerPool(storageFlushConcurrency) | ||
flushWorkers.Init() | ||
handler := newDownsamplerFlushHandler(o.Storage, sortedTagIteratorPool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts about splitting it ?
|
||
wg.Done() | ||
}() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wg.Wait() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, done.
I'll split it up, sounds good. |
This is all refactored, and good to review. |
if len(mp.ChunkedID.Suffix) != 0 { | ||
expected++ | ||
} | ||
tags := make(models.Tags, expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider always going make(model.Tags, iter.NumTags()+1) and have a comment that sometimes we can have a suffix. Might be easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
"github.com/m3db/m3cluster/services/leader/campaign" | ||
) | ||
|
||
type localLeaderService struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on what this does ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
valueBuf []byte | ||
} | ||
|
||
var _ ident.TagIterator = &tags{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var _ ident.TagIterator = (*tags)(nil)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
Going to merge, there's some issue with the Prometheus integration test running out of disk space... need to rename the repository to make progress today, so going to merge this first. |
This change builds on the multi-cluster support for the coordinator that adds embedded downsampling to the coordinator.
This is ready to review, the API is stable, only thing remaining is test coverage that is being worked on now.