From 3040f12a3a7df767e419edd0df263f109a0d5a03 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 5 Jun 2018 01:21:29 +1000 Subject: [PATCH 1/3] Add rudimentary downsampling for m3coordinator --- glide.lock | 63 +- glide.yaml | 24 +- .../services/m3coordinator/config/config.go | 6 +- .../m3coordinator/downsample/downsample_id.go | 330 ++++++++ .../m3coordinator/downsample/downsampler.go | 756 ++++++++++++++++++ .../m3coordinator/downsample/policy.go | 39 + .../handler/prometheus/remote/write.go | 79 +- .../services/m3coordinator/httpd/handler.go | 34 +- src/coordinator/models/tag.go | 25 +- .../services/m3coordinator/server/server.go | 19 +- src/coordinator/storage/config.go | 50 ++ src/coordinator/storage/interface.go | 36 + src/coordinator/storage/local/cluster.go | 54 ++ src/coordinator/storage/local/config.go | 45 ++ src/coordinator/storage/local/storage.go | 294 ++++++- 15 files changed, 1791 insertions(+), 63 deletions(-) create mode 100644 src/cmd/services/m3coordinator/downsample/downsample_id.go create mode 100644 src/cmd/services/m3coordinator/downsample/downsampler.go create mode 100644 src/cmd/services/m3coordinator/downsample/policy.go create mode 100644 src/coordinator/storage/config.go create mode 100644 src/coordinator/storage/local/cluster.go create mode 100644 src/coordinator/storage/local/config.go diff --git a/glide.lock b/glide.lock index 9c55869de0..e73b661801 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 2a2255466ac5d28a2fb2b58fea870901c9293f65187b752573811abde1c83824 -updated: 2018-05-25T15:28:47.008691-04:00 +hash: a871444f595c59b6a23c3ab042e3e012ca7313fe40b30f8609dfa6112ef003c2 +updated: 2018-06-03T19:43:08.601749+10:00 imports: - name: github.com/apache/thrift version: c2fb1c4e8c931d22617bebb0bf388cb4d5e6fcff @@ -49,7 +49,6 @@ imports: - etcdserver/etcdserverpb/gw - etcdserver/membership - etcdserver/stats - - integration - lease - lease/leasehttp - lease/leasepb @@ -89,15 +88,13 @@ imports: - wal - wal/walpb - name: github.com/coreos/go-semver - version: 8ab6407b697782a06568d4b7f1db25550ec2e4c6 + version: 568e959cd89871e61434c1143528d9162da89ef2 subpackages: - semver - name: github.com/coreos/go-systemd version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 subpackages: - - daemon - journal - - util - name: github.com/coreos/pkg version: 97fdf19511ea361ae1c100dd393cc47f8dcfa1e1 subpackages: @@ -114,12 +111,10 @@ imports: - spew - name: github.com/dgrijalva/jwt-go version: d2709f9f1f31ebcda9651b03077758c1f3a0018c -- name: github.com/dgryski/go-bits - version: 2ad8d707cc05b1815ce6ff2543bb5e8d8f9298ef - name: github.com/edsrzf/mmap-go version: 0bce6a6887123b67a60366d2c9fe2dfb74289d2e - name: github.com/ghodss/yaml - version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7 + version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee - name: github.com/glycerine/go-unsnap-stream version: 62a9a9eb44fd8932157b1a8ace2149eff5971af6 - name: github.com/go-kit/kit @@ -142,7 +137,7 @@ imports: subpackages: - gomock - name: github.com/golang/protobuf - version: 5a0f697c9ed9d68fef0116532c6e05cfeae00e55 + version: 3a3da3a4e26776cc22a79ef46d5d58477532dede subpackages: - jsonpb - proto @@ -185,6 +180,20 @@ imports: version: 07973db6b78acb62ac207d0538055e874b49d90d - name: github.com/m3db/bloom version: 47fe1193cdb900de7193d1f3d26ea9b2cbf6fb31 +- name: github.com/m3db/m3aggregator + version: fd38c07d1a94b8598b6839a9c471e0a6feacfafb + subpackages: + - aggregation + - aggregation/quantile/cm + - aggregator + - aggregator/handler + - aggregator/handler/common + - aggregator/handler/writer + - generated/proto/flush + - hash + - rate + - runtime + - sharding - name: github.com/m3db/m3cluster version: 2096f1ed16154b3ce05c9b596c5fd1a8fa485c1b subpackages: @@ -224,11 +233,24 @@ imports: - os/fs - x/grpc - name: github.com/m3db/m3metrics - version: 17e4ddf89f2b0c8fbb16b6d5b71a871b793d2a00 + version: f22d8684fa8b42ff30f1d68f6f2be5e465db9a9d subpackages: + - aggregation + - errors + - filters - generated/proto/schema + - matcher + - matcher/cache - metric + - metric/aggregated + - metric/id + - metric/id/m3 + - metric/unaggregated - policy + - protocol/msgpack + - rules + - rules/models + - rules/models/changes - name: github.com/m3db/m3ninx version: 7556fa8339674f1d9f559486d1feca18c17d1190 subpackages: @@ -363,13 +385,15 @@ imports: - util/strutil - util/testutil - name: github.com/prometheus/tsdb - version: 706602daed1487f7849990678b4ece4599745905 + version: 16b2bf1b45ce3e3536c78ebec5116ea09a69786e subpackages: + - chunkenc - chunks - fileutil + - index - labels - name: github.com/RoaringBitmap/roaring - version: 1a28a7fa985680f9f4e1644c0a857ec359a444b0 + version: 361768d03f0924093d3eed7623f3cf58392620f4 - name: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 - name: github.com/sergi/go-diff @@ -437,8 +461,6 @@ imports: - zapcore - name: golang.org/x/crypto version: 1351f936d976c60a0a48d728281922cf63eafb8d - repo: https://github.com/golang/crypto - vcs: git subpackages: - bcrypt - blowfish @@ -465,8 +487,6 @@ imports: - errgroup - name: golang.org/x/sys version: d4feaf1a7e61e1d9e79e6c4e76c6349e9cab0a03 - repo: https://github.com/golang/sys - vcs: git subpackages: - unix - name: golang.org/x/text @@ -493,12 +513,17 @@ imports: - googleapis/api/annotations - googleapis/rpc/status - name: google.golang.org/grpc - version: 401e0e00e4bb830a10496d64cd95e068c5bf50de + version: 41344da2231b913fa3d983840a57a6b1b7b631a1 subpackages: - balancer + - balancer/base + - balancer/roundrobin + - channelz - codes - connectivity - credentials + - encoding + - encoding/proto - grpclb/grpc_lb_v1/messages - grpclog - health/grpc_health_v1 @@ -508,6 +533,8 @@ imports: - naming - peer - resolver + - resolver/dns + - resolver/passthrough - stats - status - tap diff --git a/glide.yaml b/glide.yaml index b241ebbcb7..105c488c36 100644 --- a/glide.yaml +++ b/glide.yaml @@ -25,6 +25,9 @@ import: - package: github.com/m3db/m3em version: ed532baee45a440f0b08b6893c816634c6978d4d +- package: github.com/m3db/m3aggregator + version: fd38c07d1a94b8598b6839a9c471e0a6feacfafb + - package: github.com/m3db/m3ninx version: 7556fa8339674f1d9f559486d1feca18c17d1190 @@ -56,7 +59,7 @@ import: - gomock - package: github.com/golang/protobuf - version: 5a0f697c9ed9d68fef0116532c6e05cfeae00e55 + version: 3a3da3a4e26776cc22a79ef46d5d58477532dede subpackages: - proto @@ -122,7 +125,7 @@ import: version: ^2.2.6 - package: github.com/m3db/m3metrics - version: 17e4ddf89f2b0c8fbb16b6d5b71a871b793d2a00 + version: f22d8684fa8b42ff30f1d68f6f2be5e465db9a9d subpackages: - policy @@ -138,14 +141,31 @@ import: subpackages: - cmp +# START_PROMETHEUS_DEPS - package: github.com/prometheus/prometheus version: 998dfcbac689ae832ea64ca134fcb096f61a7f62 +# To avoid prometheus/prometheus dependencies from breaking, +# pin the transitive dependencies +- package: github.com/prometheus/common + version: 9e0844febd9e2856f839c9cb974fbd676d1755a8 + +- package: github.com/prometheus/procfs + version: a1dba9ce8baed984a2495b658c82687f8157b98f + +- package: github.com/prometheus/tsdb + version: 16b2bf1b45ce3e3536c78ebec5116ea09a69786e +# END_PROMETHEUS_DEPS + - package: github.com/coreos/pkg version: 4 subpackages: - capnslog +# To avoid conflicting packages not resolving the latest GRPC +- package: google.golang.org/grpc + version: 41344da2231b913fa3d983840a57a6b1b7b631a1 + testImport: - package: github.com/fortytw2/leaktest version: 3677f62bb30dbf3b042c4c211245d072aa9ee075 diff --git a/src/cmd/services/m3coordinator/config/config.go b/src/cmd/services/m3coordinator/config/config.go index ec9b32525c..6610dfbac8 100644 --- a/src/cmd/services/m3coordinator/config/config.go +++ b/src/cmd/services/m3coordinator/config/config.go @@ -27,15 +27,15 @@ import ( // Configuration is the configuration for the coordinator. type Configuration struct { - // DBClient is the DB client configuration. - DBClient *client.Configuration `yaml:"dbClient"` - // Metrics configuration. Metrics instrument.MetricsConfiguration `yaml:"metrics"` // ListenAddress is the server listen address. ListenAddress string `yaml:"listenAddress" validate:"nonzero"` + // DBClient is the DB client configuration. + DBClient *client.Configuration `yaml:"dbClient"` + // RPC is the RPC configuration. RPC *RPCConfiguration `yaml:"rpc"` } diff --git a/src/cmd/services/m3coordinator/downsample/downsample_id.go b/src/cmd/services/m3coordinator/downsample/downsample_id.go new file mode 100644 index 0000000000..df334a39f0 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/downsample_id.go @@ -0,0 +1,330 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "bytes" + "errors" + "fmt" + + "github.com/m3db/m3db/src/dbnode/serialize" + "github.com/m3db/m3metrics/metric/id" + "github.com/m3db/m3x/checked" + "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/pool" + + "github.com/prometheus/common/model" +) + +var ( + metricNameTagName = []byte(model.MetricNameLabel) + rollupTagName = []byte("m3_rollup") + rollupTagValue = []byte("true") + + errNoMetricNameTag = errors.New("no metric name tag found") +) + +// Ensure encodedTagsIterator implements id.SortedTagIterator +var _ id.SortedTagIterator = &encodedTagsIterator{} + +// Ensure encodedTagsIterator implements id.ID +var _ id.ID = &encodedTagsIterator{} + +type encodedTagsIterator struct { + tagDecoder serialize.TagDecoder + bytes checked.Bytes + pool *encodedTagsIteratorPool +} + +func newEncodedTagsIterator( + tagDecoder serialize.TagDecoder, + pool *encodedTagsIteratorPool, +) *encodedTagsIterator { + return &encodedTagsIterator{ + tagDecoder: tagDecoder, + bytes: checked.NewBytes(nil, nil), + pool: pool, + } +} + +// Reset resets the iterator. +func (it *encodedTagsIterator) Reset(sortedTagPairs []byte) { + it.bytes.Reset(sortedTagPairs) + it.tagDecoder.Reset(it.bytes) +} + +// Bytes returns the underlying bytes. +func (it *encodedTagsIterator) Bytes() []byte { + return it.bytes.Bytes() +} + +// TagValue returns the value for a tag value. +func (it *encodedTagsIterator) TagValue(tagName []byte) ([]byte, bool) { + it.tagDecoder.Reset(it.bytes) + for it.Next() { + name, value := it.Current() + if bytes.Equal(tagName, name) { + return value, true + } + } + return nil, false +} + +// Next returns true if there are more tag names and values. +func (it *encodedTagsIterator) Next() bool { + return it.tagDecoder.Next() +} + +// Current returns the current tag name and value. +func (it *encodedTagsIterator) Current() ([]byte, []byte) { + tag := it.tagDecoder.Current() + return tag.Name.Bytes(), tag.Value.Bytes() +} + +// Err returns any errors encountered. +func (it *encodedTagsIterator) Err() error { + return it.tagDecoder.Err() +} + +// Close closes the iterator. +func (it *encodedTagsIterator) Close() { + it.bytes.Reset(nil) + it.tagDecoder.Reset(it.bytes) + + if it.pool != nil { + it.pool.Put(it) + } +} + +type encodedTagsIteratorPool struct { + tagDecoderPool serialize.TagDecoderPool + pool pool.ObjectPool +} + +func newEncodedTagsIteratorPool( + tagDecoderPool serialize.TagDecoderPool, + opts pool.ObjectPoolOptions, +) *encodedTagsIteratorPool { + return &encodedTagsIteratorPool{ + tagDecoderPool: tagDecoderPool, + pool: pool.NewObjectPool(opts), + } +} + +func (p *encodedTagsIteratorPool) Init() { + p.tagDecoderPool.Init() + p.pool.Init(func() interface{} { + return newEncodedTagsIterator(p.tagDecoderPool.Get(), p) + }) +} + +func (p *encodedTagsIteratorPool) Get() *encodedTagsIterator { + return p.pool.Get().(*encodedTagsIterator) +} + +func (p *encodedTagsIteratorPool) Put(v *encodedTagsIterator) { + p.pool.Put(v) +} + +func isRollupID( + tags []byte, + iteratorPool *encodedTagsIteratorPool, +) bool { + iter := iteratorPool.Get() + iter.Reset(tags) + + for iter.Next() { + name, value := iter.Current() + if bytes.Equal(name, rollupTagName) && bytes.Equal(value, rollupTagValue) { + return true + } + } + iter.Close() + + return false +} + +type rollupIDProvider struct { + index int + tagPairs []id.TagPair + rollupTagIndex int + + tagEncoder serialize.TagEncoder + pool *rollupIDProviderPool +} + +func newRollupIDProvider( + tagEncoder serialize.TagEncoder, + pool *rollupIDProviderPool, +) *rollupIDProvider { + return &rollupIDProvider{ + tagEncoder: tagEncoder, + pool: pool, + } +} + +func (p *rollupIDProvider) provide( + tagPairs []id.TagPair, +) ([]byte, error) { + p.reset(tagPairs) + p.tagEncoder.Reset() + if err := p.tagEncoder.Encode(p); err != nil { + return nil, err + } + data, ok := p.tagEncoder.Data() + if !ok { + return nil, fmt.Errorf("unable to access encoded tags: ok=%v", ok) + } + // Need to return a copy + return append([]byte(nil), data.Bytes()...), nil +} + +func (p *rollupIDProvider) reset( + tagPairs []id.TagPair, +) { + p.index = -1 + p.tagPairs = tagPairs + p.rollupTagIndex = -1 + for idx, pair := range tagPairs { + if bytes.Compare(rollupTagName, pair.Name) < 0 { + p.rollupTagIndex = idx + break + } + } + if p.rollupTagIndex == -1 { + p.rollupTagIndex = len(p.tagPairs) + } +} + +func (p *rollupIDProvider) length() int { + return len(p.tagPairs) + 1 +} + +func (p *rollupIDProvider) finalize() { + if p.pool != nil { + p.pool.Put(p) + } +} + +func (p *rollupIDProvider) Next() bool { + p.index++ + return p.index < p.length() +} + +func (p *rollupIDProvider) Current() ident.Tag { + idx := p.index + if idx == p.rollupTagIndex { + return ident.Tag{ + Name: ident.BytesID(rollupTagName), + Value: ident.BytesID(rollupTagValue), + } + } + + if idx > p.rollupTagIndex { + // Effective index is subtracted by 1 + idx-- + } + + return ident.Tag{ + Name: ident.BytesID(p.tagPairs[idx].Name), + Value: ident.BytesID(p.tagPairs[idx].Value), + } +} + +func (p *rollupIDProvider) Err() error { + return nil +} + +func (p *rollupIDProvider) Close() { + // No-op +} + +func (p *rollupIDProvider) Remaining() int { + return p.length() - p.index - 1 +} + +func (p *rollupIDProvider) Duplicate() ident.TagIterator { + duplicate := new(rollupIDProvider) + *duplicate = *p + duplicate.reset(p.tagPairs) + return duplicate +} + +type rollupIDProviderPool struct { + tagEncoderPool serialize.TagEncoderPool + pool pool.ObjectPool +} + +func newRollupIDProviderPool( + tagEncoderPool serialize.TagEncoderPool, + opts pool.ObjectPoolOptions, +) *rollupIDProviderPool { + return &rollupIDProviderPool{ + tagEncoderPool: tagEncoderPool, + pool: pool.NewObjectPool(opts), + } +} + +func (p *rollupIDProviderPool) Init() { + p.pool.Init(func() interface{} { + return newRollupIDProvider(p.tagEncoderPool.Get(), p) + }) +} + +func (p *rollupIDProviderPool) Get() *rollupIDProvider { + return p.pool.Get().(*rollupIDProvider) +} + +func (p *rollupIDProviderPool) Put(v *rollupIDProvider) { + p.pool.Put(v) +} + +func encodedTagsNameAndTags( + id []byte, + iterPool *encodedTagsIteratorPool, +) ([]byte, []byte, error) { + // ID is always the encoded tags for downsampling IDs + metricTags := id + + iter := iterPool.Get() + iter.Reset(metricTags) + defer iter.Close() + + var metricName []byte + for iter.Next() { + name, value := iter.Current() + if bytes.Equal(metricNameTagName, name) { + metricName = value + break + } + } + + if err := iter.Err(); err != nil { + return nil, nil, err + } + + if metricName == nil { + // No name was found in encoded tags + return nil, nil, errNoMetricNameTag + } + + return metricName, metricTags, nil +} diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go new file mode 100644 index 0000000000..a4b38a431e --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -0,0 +1,756 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "context" + "errors" + "fmt" + "math" + "runtime" + "sort" + "sync" + "time" + + "github.com/m3db/m3aggregator/aggregator" + "github.com/m3db/m3aggregator/aggregator/handler" + "github.com/m3db/m3aggregator/aggregator/handler/writer" + "github.com/m3db/m3cluster/kv" + "github.com/m3db/m3cluster/kv/mem" + "github.com/m3db/m3cluster/placement" + "github.com/m3db/m3cluster/services" + "github.com/m3db/m3cluster/services/leader" + "github.com/m3db/m3cluster/shard" + "github.com/m3db/m3db/src/coordinator/models" + "github.com/m3db/m3db/src/coordinator/storage" + "github.com/m3db/m3db/src/coordinator/ts" + "github.com/m3db/m3db/src/dbnode/serialize" + "github.com/m3db/m3metrics/aggregation" + "github.com/m3db/m3metrics/filters" + "github.com/m3db/m3metrics/matcher" + "github.com/m3db/m3metrics/matcher/cache" + "github.com/m3db/m3metrics/metric/aggregated" + "github.com/m3db/m3metrics/metric/id" + "github.com/m3db/m3metrics/metric/unaggregated" + "github.com/m3db/m3metrics/policy" + "github.com/m3db/m3metrics/rules" + "github.com/m3db/m3x/clock" + xerrors "github.com/m3db/m3x/errors" + "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" + xsync "github.com/m3db/m3x/sync" + + "github.com/coreos/etcd/integration" + "github.com/uber-go/tally" +) + +const ( + initAllocTagsSliceCapacity = 32 + shardSetID = uint32(0) + instanceID = "downsampler_local" + placementKVKey = "/placement" + aggregationSuffixTag = "aggregation" + defaultStorageFlushConcurrency = 20000 +) + +var ( + numShards = runtime.NumCPU() + + errNoStorage = errors.New("dynamic downsampling enabled with storage set") + errNoRulesStore = errors.New("dynamic downsampling enabled with rules store not set") + errNoClockOptions = errors.New("dynamic downsampling enabled with clock options not set") + errNoInstrumentOptions = errors.New("dynamic downsampling enabled with instrument options not set") + errNoTagEncoderOptions = errors.New("dynamic downsampling enabled with tag encoder options not set") + errNoTagDecoderOptions = errors.New("dynamic downsampling enabled with tag decoder options not set") + errNoTagEncoderPoolOptions = errors.New("dynamic downsampling enabled with tag encoder pool options not set") + errNoTagDecoderPoolOptions = errors.New("dynamic downsampling enabled with tag decoder pool options not set") +) + +// Downsampler is a downsampler. +type Downsampler interface { + NonePolicy() NonePolicy + AggregationPolicy() AggregationPolicy + MetricsAppender() MetricsAppender +} + +// MetricsAppender is a metrics appender that can +// build a samples appender, only valid to use +// with a single caller at a time. +type MetricsAppender interface { + AddTag(name, value string) + SamplesAppender() (SamplesAppender, error) + Reset() + Finalize() +} + +// SamplesAppender is a downsampling samples appender, +// that can only be called by a single caller at a time. +type SamplesAppender interface { + AppendCounterSample(value int64) error + AppendGaugeSample(value float64) error +} + +// DownsamplerOptions is a set of required downsampler options. +type DownsamplerOptions struct { + Storage storage.Storage + StorageFlushConcurrency int + RulesKVStore kv.Store + ClockOptions clock.Options + InstrumentOptions instrument.Options + TagEncoderOptions serialize.TagEncoderOptions + TagDecoderOptions serialize.TagDecoderOptions + TagEncoderPoolOptions pool.ObjectPoolOptions + TagDecoderPoolOptions pool.ObjectPoolOptions +} + +// Validate will validate the dynamic downsampling options. +func (o DownsamplerOptions) Validate() error { + if o.Storage == nil { + return errNoStorage + } + if o.RulesKVStore == nil { + return errNoRulesStore + } + if o.ClockOptions == nil { + return errNoClockOptions + } + if o.InstrumentOptions == nil { + return errNoInstrumentOptions + } + if o.TagEncoderOptions == nil { + return errNoTagEncoderOptions + } + if o.TagDecoderOptions == nil { + return errNoTagDecoderOptions + } + if o.TagEncoderPoolOptions == nil { + return errNoTagEncoderPoolOptions + } + if o.TagDecoderPoolOptions == nil { + return errNoTagDecoderPoolOptions + } + return nil +} + +type newAggregatorResult struct { + aggregator aggregator.Aggregator + + clockOpts clock.Options + + matcher matcher.Matcher + + tagEncoderPool serialize.TagEncoderPool + tagDecoderPool serialize.TagDecoderPool + + encodedTagsIteratorPool *encodedTagsIteratorPool +} + +func (o DownsamplerOptions) newAggregator() (newAggregatorResult, error) { + // Validate options first. + if err := o.Validate(); err != nil { + return newAggregatorResult{}, err + } + + var ( + storageFlushConcurrency = defaultStorageFlushConcurrency + rulesStore = o.RulesKVStore + clockOpts = o.ClockOptions + instrumentOpts = o.InstrumentOptions + ) + if o.StorageFlushConcurrency > 0 { + storageFlushConcurrency = storageFlushConcurrency + } + + // Configure rules options. + tagEncoderPool := serialize.NewTagEncoderPool(o.TagEncoderOptions, + o.TagEncoderPoolOptions) + tagEncoderPool.Init() + + tagDecoderPool := serialize.NewTagDecoderPool(o.TagDecoderOptions, + o.TagDecoderPoolOptions) + tagDecoderPool.Init() + + sortedTagIteratorPool := newEncodedTagsIteratorPool(tagDecoderPool, + o.TagDecoderPoolOptions) + sortedTagIteratorPool.Init() + + sortedTagIteratorFn := func(tagPairs []byte) id.SortedTagIterator { + it := sortedTagIteratorPool.Get() + it.Reset(tagPairs) + return it + } + + tagsFilterOptions := filters.TagsFilterOptions{ + NameTagKey: metricNameTagName, + NameAndTagsFn: func(id []byte) ([]byte, []byte, error) { + return encodedTagsNameAndTags(id, sortedTagIteratorPool) + }, + SortedTagIteratorFn: sortedTagIteratorFn, + } + + isRollupIDFn := func(name []byte, tags []byte) bool { + return isRollupID(tags, sortedTagIteratorPool) + } + + newRollupIDProviderPool := newRollupIDProviderPool(tagEncoderPool, + o.TagEncoderPoolOptions) + newRollupIDProviderPool.Init() + + newRollupIDFn := func(name []byte, tagPairs []id.TagPair) []byte { + rollupIDProvider := newRollupIDProviderPool.Get() + id, err := rollupIDProvider.provide(tagPairs) + if err != nil { + panic(err) // Encoding should never fail + } + rollupIDProvider.finalize() + return id + } + + // Use default aggregation types, in future we can provide more configurability + var defaultAggregationTypes aggregation.TypesConfiguration + aggTypeOpts, err := defaultAggregationTypes.NewOptions(instrumentOpts) + if err != nil { + return newAggregatorResult{}, err + } + + ruleSetOpts := rules.NewOptions(). + SetTagsFilterOptions(tagsFilterOptions). + SetNewRollupIDFn(newRollupIDFn). + SetIsRollupIDFn(isRollupIDFn). + SetAggregationTypesOptions(aggTypeOpts) + + opts := matcher.NewOptions(). + SetClockOptions(clockOpts). + SetInstrumentOptions(instrumentOpts). + SetRuleSetOptions(ruleSetOpts). + SetKVStore(rulesStore) + + cacheOpts := cache.NewOptions(). + SetClockOptions(clockOpts). + SetInstrumentOptions(instrumentOpts. + SetMetricsScope(instrumentOpts.MetricsScope().SubScope("matcher-cache"))) + + cache := cache.NewCache(cacheOpts) + + matcher, err := matcher.NewMatcher(cache, opts) + if err != nil { + return newAggregatorResult{}, err + } + + aggregatorOpts := aggregator.NewOptions(). + SetClockOptions(clockOpts). + SetInstrumentOptions(instrumentOpts). + SetAggregationTypesOptions(aggTypeOpts). + SetMetricPrefix(nil). + SetCounterPrefix(nil). + SetGaugePrefix(nil). + SetTimerPrefix(nil) + + shardSet := make([]shard.Shard, numShards) + for i := 0; i < numShards; i++ { + shardSet[i] = shard.NewShard(uint32(i)). + SetState(shard.Initializing). + SetCutoverNanos(0). + SetCutoffNanos(math.MaxInt64) + } + shards := shard.NewShards(shardSet) + instance := placement.NewInstance(). + SetID(instanceID). + SetShards(shards). + SetShardSetID(shardSetID) + localPlacement := placement.NewPlacement(). + SetInstances([]placement.Instance{instance}). + SetShards(shards.AllIDs()) + stagedPlacement := placement.NewStagedPlacement(). + SetPlacements([]placement.Placement{localPlacement}) + stagedPlacementProto, err := stagedPlacement.Proto() + if err != nil { + return newAggregatorResult{}, err + } + + placementStore := mem.NewStore() + _, err = placementStore.SetIfNotExists(placementKVKey, stagedPlacementProto) + if err != nil { + return newAggregatorResult{}, err + } + + placementWatcherOpts := placement.NewStagedPlacementWatcherOptions(). + SetStagedPlacementKey(placementKVKey). + SetStagedPlacementStore(placementStore) + placementWatcher := placement.NewStagedPlacementWatcher(placementWatcherOpts) + placementManagerOpts := aggregator.NewPlacementManagerOptions(). + SetInstanceID(instanceID). + SetStagedPlacementWatcher(placementWatcher) + placementManager := aggregator.NewPlacementManager(placementManagerOpts) + aggregatorOpts = aggregatorOpts.SetPlacementManager(placementManager) + + // Set up flush times manager. + flushTimesManagerOpts := aggregator.NewFlushTimesManagerOptions(). + SetFlushTimesStore(placementStore) + flushTimesManager := aggregator.NewFlushTimesManager(flushTimesManagerOpts) + aggregatorOpts = aggregatorOpts.SetFlushTimesManager(flushTimesManager) + + // Set up election manager. + leaderValue := instanceID + campaignOpts, err := services.NewCampaignOptions() + if err != nil { + return newAggregatorResult{}, err + } + + campaignOpts = campaignOpts.SetLeaderValue(leaderValue) + electionCluster := integration.NewClusterV3(nil, &integration.ClusterConfig{ + Size: 1, + }) + + sid := services.NewServiceID(). + SetEnvironment("production"). + SetName("downsampler"). + SetZone("embedded") + + eopts := services.NewElectionOptions() + + leaderOpts := leader.NewOptions(). + SetServiceID(sid). + SetElectionOpts(eopts) + + leaderService, err := leader.NewService(electionCluster.RandClient(), leaderOpts) + if err != nil { + return newAggregatorResult{}, err + } + + electionManagerOpts := aggregator.NewElectionManagerOptions(). + SetCampaignOptions(campaignOpts). + SetLeaderService(leaderService). + SetPlacementManager(placementManager). + SetFlushTimesManager(flushTimesManager) + electionManager := aggregator.NewElectionManager(electionManagerOpts) + aggregatorOpts = aggregatorOpts.SetElectionManager(electionManager) + + // Set up flush manager. + flushManagerOpts := aggregator.NewFlushManagerOptions(). + SetPlacementManager(placementManager). + SetFlushTimesManager(flushTimesManager). + SetElectionManager(electionManager) + flushManager := aggregator.NewFlushManager(flushManagerOpts) + aggregatorOpts = aggregatorOpts.SetFlushManager(flushManager) + + flushWorkers := xsync.NewWorkerPool(storageFlushConcurrency) + flushWorkers.Init() + handler := newDownsamplerFlushHandler(o.Storage, sortedTagIteratorPool, + flushWorkers, instrumentOpts) + aggregatorOpts = aggregatorOpts.SetFlushHandler(handler) + + return newAggregatorResult{ + aggregator: aggregator.NewAggregator(aggregatorOpts), + matcher: matcher, + tagEncoderPool: tagEncoderPool, + tagDecoderPool: tagDecoderPool, + encodedTagsIteratorPool: sortedTagIteratorPool, + }, nil +} + +type retentionResolution struct { + retention time.Duration + resolution time.Duration +} + +type downsamplerFlushHandler struct { + sync.RWMutex + storage storage.Storage + encodedTagsIteratorPool *encodedTagsIteratorPool + workerPool xsync.WorkerPool + instrumentOpts instrument.Options + metrics downsamplerFlushHandlerMetrics +} + +type downsamplerFlushHandlerMetrics struct { + flushSuccess tally.Counter + flushErrors tally.Counter +} + +func newDownsamplerFlushHandlerMetrics( + scope tally.Scope, +) downsamplerFlushHandlerMetrics { + return downsamplerFlushHandlerMetrics{ + flushSuccess: scope.Counter("flush-success"), + flushErrors: scope.Counter("flush-errors"), + } +} + +func newDownsamplerFlushHandler( + storage storage.Storage, + encodedTagsIteratorPool *encodedTagsIteratorPool, + workerPool xsync.WorkerPool, + instrumentOpts instrument.Options, +) handler.Handler { + scope := instrumentOpts.MetricsScope().SubScope("downsampler-flush-handler") + return &downsamplerFlushHandler{ + storage: storage, + encodedTagsIteratorPool: encodedTagsIteratorPool, + workerPool: workerPool, + instrumentOpts: instrumentOpts, + metrics: newDownsamplerFlushHandlerMetrics(scope), + } +} + +func (h *downsamplerFlushHandler) NewWriter( + scope tally.Scope, +) (writer.Writer, error) { + return &downsamplerFlushHandlerWriter{ + ctx: context.Background(), + handler: h, + }, nil +} + +func (h *downsamplerFlushHandler) Close() { +} + +type downsamplerFlushHandlerWriter struct { + wg sync.WaitGroup + ctx context.Context + handler *downsamplerFlushHandler +} + +func (w *downsamplerFlushHandlerWriter) Write( + mp aggregated.ChunkedMetricWithStoragePolicy, +) error { + w.wg.Add(1) + w.handler.workerPool.Go(func() { + defer w.wg.Done() + + logger := w.handler.instrumentOpts.Logger() + + iter := w.handler.encodedTagsIteratorPool.Get() + iter.Reset(mp.ChunkedID.Data) + expected := iter.tagDecoder.Remaining() + if len(mp.ChunkedID.Suffix) > 0 { + expected++ + } + tags := make(models.Tags, expected) + for iter.Next() { + name, value := iter.Current() + tags[string(name)] = string(value) + } + if len(mp.ChunkedID.Suffix) > 0 { + tags[aggregationSuffixTag] = string(mp.ChunkedID.Suffix) + } + + err := iter.Err() + iter.Close() + if err != nil { + logger.Debugf("downsampler flush error preparing write: %v", err) + w.handler.metrics.flushErrors.Inc(1) + return + } + + err = w.handler.storage.Write(w.ctx, &storage.WriteQuery{ + Tags: tags, + Datapoints: ts.Datapoints{ts.Datapoint{ + Timestamp: time.Unix(0, mp.TimeNanos), + Value: mp.Value, + }}, + Unit: mp.StoragePolicy.Resolution().Precision, + Attributes: storage.Attributes{ + MetricsType: storage.AggregatedMetricsType, + Retention: mp.StoragePolicy.Retention().Duration(), + Resolution: mp.StoragePolicy.Resolution().Window, + }, + }) + if err != nil { + logger.Debugf("downsampler flush error failed write: %v", err) + w.handler.metrics.flushErrors.Inc(1) + return + } + + w.handler.metrics.flushSuccess.Inc(1) + }) + + return nil +} + +func (w *downsamplerFlushHandlerWriter) Flush() error { + w.wg.Wait() + return nil +} + +func (w *downsamplerFlushHandlerWriter) Close() error { + return nil +} + +type downsampler struct { + cfg DownsamplingConfiguration + aggregator newAggregatorResult +} + +// NewDownsampler returns a new downsampler. +func NewDownsampler( + cfg DownsamplingConfiguration, + opts DownsamplerOptions, +) (Downsampler, error) { + d := &downsampler{ + cfg: cfg, + } + + if cfg.AggregationPolicy.Enabled { + result, err := opts.newAggregator() + if err != nil { + return nil, err + } + d.aggregator = result + } + + return d, nil +} + +func (d *downsampler) NonePolicy() NonePolicy { + return d.cfg.NonePolicy +} + +func (d *downsampler) AggregationPolicy() AggregationPolicy { + return d.cfg.AggregationPolicy +} + +func (d *downsampler) MetricsAppender() MetricsAppender { + return newMetricsAppender(metricsAppenderOptions{ + agg: d.aggregator.aggregator, + clockOpts: d.aggregator.clockOpts, + tagEncoder: d.aggregator.tagEncoderPool.Get(), + matcher: d.aggregator.matcher, + encodedTagsIteratorPool: d.aggregator.encodedTagsIteratorPool, + }) +} + +func newMetricsAppender(opts metricsAppenderOptions) *metricsAppender { + return &metricsAppender{ + metricsAppenderOptions: opts, + tags: newTags(), + multiSamplesAppender: newMultiSamplesAppender(), + } +} + +type metricsAppender struct { + metricsAppenderOptions + + tags *tags + multiSamplesAppender *multiSamplesAppender +} + +type metricsAppenderOptions struct { + agg aggregator.Aggregator + clockOpts clock.Options + tagEncoder serialize.TagEncoder + matcher matcher.Matcher + encodedTagsIteratorPool *encodedTagsIteratorPool +} + +func (a *metricsAppender) AddTag(name, value string) { + a.tags.names = append(a.tags.names, name) + a.tags.values = append(a.tags.values, name) +} + +func (a *metricsAppender) SamplesAppender() (SamplesAppender, error) { + // Sort tags + sort.Sort(a.tags) + + // Encode tags and compute a temporary (unowned) ID + a.tagEncoder.Reset() + if err := a.tagEncoder.Encode(a.tags); err != nil { + return nil, err + } + data, ok := a.tagEncoder.Data() + if !ok { + return nil, fmt.Errorf("unable to encode tags: names=%v, values=%v", + a.tags.names, a.tags.values) + } + + unownedID := data.Bytes() + + // Match policies and rollups and build samples appender + id := a.encodedTagsIteratorPool.Get() + id.Reset(unownedID) + now := time.Now() + fromNanos, toNanos := now.Add(-1*a.clockOpts.MaxNegativeSkew()).UnixNano(), + now.Add(1*a.clockOpts.MaxPositiveSkew()).UnixNano() + matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos) + id.Close() + + policies := matchResult.MappingsAt(now.UnixNano()) + + a.multiSamplesAppender.reset() + a.multiSamplesAppender.addSamplesAppender(samplesAppender{ + agg: a.agg, + unownedID: unownedID, + policies: policies, + }) + + numRollups := matchResult.NumRollups() + for i := 0; i < numRollups; i++ { + rollup, ok := matchResult.RollupsAt(i, now.UnixNano()) + if !ok { + continue + } + + a.multiSamplesAppender.addSamplesAppender(samplesAppender{ + agg: a.agg, + unownedID: rollup.ID, + policies: rollup.PoliciesList, + }) + } + + return a.multiSamplesAppender, nil +} + +func (a *metricsAppender) Reset() { + a.tags.names = a.tags.names[:0] + a.tags.values = a.tags.values[:0] +} + +func (a *metricsAppender) Finalize() { + a.tagEncoder.Finalize() + a.tagEncoder = nil +} + +type samplesAppender struct { + agg aggregator.Aggregator + unownedID []byte + policies policy.PoliciesList +} + +func (a samplesAppender) AppendCounterSample(value int64) error { + sample := unaggregated.MetricUnion{ + Type: unaggregated.CounterType, + OwnsID: false, + ID: a.unownedID, + CounterVal: value, + } + return a.agg.AddMetricWithPoliciesList(sample, a.policies) +} + +func (a samplesAppender) AppendGaugeSample(value float64) error { + sample := unaggregated.MetricUnion{ + Type: unaggregated.GaugeType, + OwnsID: false, + ID: a.unownedID, + GaugeVal: value, + } + return a.agg.AddMetricWithPoliciesList(sample, a.policies) +} + +// Ensure multiSamplesAppender implements SamplesAppender +var _ SamplesAppender = (*multiSamplesAppender)(nil) + +type multiSamplesAppender struct { + appenders []samplesAppender +} + +func newMultiSamplesAppender() *multiSamplesAppender { + return &multiSamplesAppender{} +} + +func (a *multiSamplesAppender) reset() { + var zeroedSamplesAppender samplesAppender + for i := range a.appenders { + a.appenders[i] = zeroedSamplesAppender + } + a.appenders = a.appenders[:0] +} + +func (a *multiSamplesAppender) addSamplesAppender(v samplesAppender) { + a.appenders = append(a.appenders, v) +} + +func (a *multiSamplesAppender) AppendCounterSample(value int64) error { + var multiErr xerrors.MultiError + for _, appender := range a.appenders { + multiErr = multiErr.Add(appender.AppendCounterSample(value)) + } + return multiErr.FinalError() +} + +func (a *multiSamplesAppender) AppendGaugeSample(value float64) error { + var multiErr xerrors.MultiError + for _, appender := range a.appenders { + multiErr = multiErr.Add(appender.AppendGaugeSample(value)) + } + return multiErr.FinalError() +} + +type tags struct { + names []string + values []string + idx int + nameBuf []byte + valueBuf []byte +} + +var _ ident.TagIterator = &tags{} +var _ sort.Interface = &tags{} + +func newTags() *tags { + return &tags{ + names: make([]string, 0, initAllocTagsSliceCapacity), + values: make([]string, 0, initAllocTagsSliceCapacity), + idx: -1, + } +} + +func (t *tags) Len() int { + return len(t.names) +} + +func (t *tags) Swap(i, j int) { + t.names[i], t.names[j] = t.names[j], t.names[i] + t.values[i], t.values[j] = t.values[j], t.values[i] +} + +func (t *tags) Less(i, j int) bool { + return t.names[i] < t.names[j] +} + +func (t *tags) Next() bool { + return t.idx+1 < len(t.names) +} + +func (t *tags) Current() ident.Tag { + t.nameBuf = append(t.nameBuf[:0], t.names[t.idx]...) + t.valueBuf = append(t.valueBuf[:0], t.values[t.idx]...) + return ident.Tag{ + Name: ident.BytesID(t.nameBuf), + Value: ident.BytesID(t.valueBuf), + } +} + +func (t *tags) Err() error { + return nil +} + +func (t *tags) Close() { + +} + +func (t *tags) Remaining() int { + return t.idx + 1 - (len(t.names) - 1) +} + +func (t *tags) Duplicate() ident.TagIterator { + return &tags{idx: -1, names: t.names, values: t.values} +} diff --git a/src/cmd/services/m3coordinator/downsample/policy.go b/src/cmd/services/m3coordinator/downsample/policy.go new file mode 100644 index 0000000000..1d28ab0c56 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/policy.go @@ -0,0 +1,39 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +// DownsamplingConfiguration is configuration of what to downsample. +type DownsamplingConfiguration struct { + // NonePolicy is the none downsampling policy. + NonePolicy NonePolicy `yaml:"nonePolicy"` + // AggregationPolicy is the aggregation downsampling policy. + AggregationPolicy AggregationPolicy `yaml:"aggregationPolicy"` +} + +// NonePolicy is a the none downsampling policy. +type NonePolicy struct { + Enabled bool `yaml:"enabled"` +} + +// AggregationPolicy is the aggregation downsampling policy. +type AggregationPolicy struct { + Enabled bool `yaml:"enabled"` +} diff --git a/src/cmd/services/m3coordinator/handler/prometheus/remote/write.go b/src/cmd/services/m3coordinator/handler/prometheus/remote/write.go index b564b75a39..dc9694454b 100644 --- a/src/cmd/services/m3coordinator/handler/prometheus/remote/write.go +++ b/src/cmd/services/m3coordinator/handler/prometheus/remote/write.go @@ -23,16 +23,19 @@ package remote import ( "context" "net/http" + "sync" + "github.com/m3db/m3db/src/cmd/services/m3coordinator/downsample" "github.com/m3db/m3db/src/cmd/services/m3coordinator/handler" "github.com/m3db/m3db/src/cmd/services/m3coordinator/handler/prometheus" "github.com/m3db/m3db/src/coordinator/generated/proto/prompb" "github.com/m3db/m3db/src/coordinator/storage" "github.com/m3db/m3db/src/coordinator/util/execution" "github.com/m3db/m3db/src/coordinator/util/logging" - "github.com/uber-go/tally" + xerrors "github.com/m3db/m3x/errors" "github.com/golang/protobuf/proto" + "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -44,13 +47,23 @@ const ( // PromWriteHandler represents a handler for prometheus write endpoint. type PromWriteHandler struct { store storage.Storage + downsampler downsample.Downsampler + writeRaw bool + writeAgg bool promWriteMetrics promWriteMetrics } // NewPromWriteHandler returns a new instance of handler. -func NewPromWriteHandler(store storage.Storage, scope tally.Scope) http.Handler { +func NewPromWriteHandler( + store storage.Storage, + downsampler downsample.Downsampler, + scope tally.Scope, +) http.Handler { return &PromWriteHandler{ store: store, + downsampler: downsampler, + writeRaw: downsampler.NonePolicy().Enabled, + writeAgg: downsampler.AggregationPolicy().Enabled, promWriteMetrics: newPromWriteMetrics(scope), } } @@ -101,11 +114,65 @@ func (h *PromWriteHandler) parseRequest(r *http.Request) (*prompb.WriteRequest, } func (h *PromWriteHandler) write(ctx context.Context, r *prompb.WriteRequest) error { - requests := make([]execution.Request, len(r.Timeseries)) - for idx, t := range r.Timeseries { - requests[idx] = newLocalWriteRequest(storage.PromWriteTSToM3(t), h.store) + var ( + wg sync.WaitGroup + writeRawErr error + writeAggErr error + ) + if h.writeAgg { + // If writing aggregations write them async + wg.Add(1) + go func() { + defer wg.Done() + + var ( + metricsAppender = h.downsampler.MetricsAppender() + multiErr xerrors.MultiError + ) + for _, ts := range r.Timeseries { + metricsAppender.Reset() + for _, label := range ts.Labels { + metricsAppender.AddTag(label.Name, label.Value) + } + + samplesAppender, err := metricsAppender.SamplesAppender() + if err != nil { + multiErr = multiErr.Add(err) + continue + } + for _, elem := range ts.Samples { + err := samplesAppender.AppendGaugeSample(elem.Value) + if err != nil { + multiErr = multiErr.Add(err) + } + } + } + + metricsAppender.Finalize() + + writeAggErr = multiErr.FinalError() + }() + } + + if h.writeRaw { + // Write the raw points out, don't spawn goroutine + // so we reduce number of goroutines just a fraction + requests := make([]execution.Request, 0, len(r.Timeseries)) + for _, t := range r.Timeseries { + write := storage.PromWriteTSToM3(t) + write.Attributes.MetricsType = storage.UnaggregatedMetricsType + request := newLocalWriteRequest(write, h.store) + requests = append(requests, request) + } + writeRawErr = execution.ExecuteParallel(ctx, requests) } - return execution.ExecuteParallel(ctx, requests) + + if h.writeAgg { + // Now wait for downsampling to finish + wg.Wait() + } + + return xerrors.FirstError(writeRawErr, writeAggErr) } func (w *localWriteRequest) Process(ctx context.Context) error { diff --git a/src/cmd/services/m3coordinator/httpd/handler.go b/src/cmd/services/m3coordinator/httpd/handler.go index be6ab9e566..e123b96965 100644 --- a/src/cmd/services/m3coordinator/httpd/handler.go +++ b/src/cmd/services/m3coordinator/httpd/handler.go @@ -26,6 +26,7 @@ import ( "os" "github.com/m3db/m3db/src/cmd/services/m3coordinator/config" + "github.com/m3db/m3db/src/cmd/services/m3coordinator/downsample" "github.com/m3db/m3db/src/cmd/services/m3coordinator/handler" "github.com/m3db/m3db/src/cmd/services/m3coordinator/handler/namespace" "github.com/m3db/m3db/src/cmd/services/m3coordinator/handler/placement" @@ -55,6 +56,7 @@ type Handler struct { Router *mux.Router CLFLogger *log.Logger storage storage.Storage + downsampler downsample.Downsampler engine *executor.Engine clusterClient m3clusterClient.Client config config.Configuration @@ -62,7 +64,14 @@ type Handler struct { } // NewHandler returns a new instance of handler with routes. -func NewHandler(storage storage.Storage, engine *executor.Engine, clusterClient m3clusterClient.Client, cfg config.Configuration, scope tally.Scope) (*Handler, error) { +func NewHandler( + storage storage.Storage, + engine *executor.Engine, + downsampler downsample.Downsampler, + clusterClient m3clusterClient.Client, + cfg config.Configuration, + scope tally.Scope, +) (*Handler, error) { r := mux.NewRouter() logger, err := zap.NewProduction() if err != nil { @@ -75,6 +84,7 @@ func NewHandler(storage storage.Storage, engine *executor.Engine, clusterClient Router: r, storage: storage, engine: engine, + downsampler: downsampler, clusterClient: clusterClient, config: cfg, scope: scope, @@ -84,12 +94,22 @@ func NewHandler(storage storage.Storage, engine *executor.Engine, clusterClient // RegisterRoutes registers all http routes. func (h *Handler) RegisterRoutes() error { - logged := logging.WithResponseTimeLogging - - h.Router.HandleFunc(remote.PromReadURL, logged(remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource))).ServeHTTP).Methods("POST") - h.Router.HandleFunc(remote.PromWriteURL, logged(remote.NewPromWriteHandler(h.storage, h.scope.Tagged(remoteSource))).ServeHTTP).Methods("POST") - h.Router.HandleFunc(native.PromReadURL, logged(native.NewPromReadHandler(h.engine)).ServeHTTP).Methods("GET") - h.Router.HandleFunc(handler.SearchURL, logged(handler.NewSearchHandler(h.storage)).ServeHTTP).Methods("POST") + var ( + d = h.downsampler + logged = logging.WithResponseTimeLogging + ) + h.Router.HandleFunc(remote.PromReadURL, + logged(remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource))).ServeHTTP). + Methods("POST") + h.Router.HandleFunc(remote.PromWriteURL, + logged(remote.NewPromWriteHandler(h.storage, d, h.scope.Tagged(remoteSource))).ServeHTTP). + Methods("POST") + h.Router.HandleFunc(native.PromReadURL, + logged(native.NewPromReadHandler(h.engine)).ServeHTTP). + Methods("GET") + h.Router.HandleFunc(handler.SearchURL, + logged(handler.NewSearchHandler(h.storage)).ServeHTTP). + Methods("POST") h.registerProfileEndpoints() diff --git a/src/coordinator/models/tag.go b/src/coordinator/models/tag.go index 15b90b00f0..1c4f6fca5a 100644 --- a/src/coordinator/models/tag.go +++ b/src/coordinator/models/tag.go @@ -125,25 +125,28 @@ func (m Matchers) ToTags() (Tags, error) { return tags, nil } +var ( + sep = []byte(",") + eq = []byte("=") +) + // ID returns a string representation of the tags func (t Tags) ID() string { - sep := "," - eq := "=" - - var b string - var keys []string - + length := 0 + keys := make([]string, 0, len(t)) for k := range t { + length += len(k) + len(t[k]) + len(eq) + len(sep) keys = append(keys, k) } sort.Strings(keys) + b := make([]byte, 0, length) for _, k := range keys { - b += k - b += eq - b += t[k] - b += sep + b = append(b, k...) + b = append(b, eq...) + b = append(b, t[k]...) + b = append(b, sep...) } - return b + return string(b) } diff --git a/src/coordinator/services/m3coordinator/server/server.go b/src/coordinator/services/m3coordinator/server/server.go index e28b726d2c..13d41eeecf 100644 --- a/src/coordinator/services/m3coordinator/server/server.go +++ b/src/coordinator/services/m3coordinator/server/server.go @@ -33,6 +33,7 @@ import ( clusterclient "github.com/m3db/m3cluster/client" "github.com/m3db/m3cluster/client/etcd" "github.com/m3db/m3db/src/cmd/services/m3coordinator/config" + "github.com/m3db/m3db/src/cmd/services/m3coordinator/downsample" "github.com/m3db/m3db/src/cmd/services/m3coordinator/httpd" m3dbcluster "github.com/m3db/m3db/src/coordinator/cluster/m3db" "github.com/m3db/m3db/src/coordinator/executor" @@ -141,15 +142,25 @@ func Run(runOpts RunOptions) { return <-dbClientCh, nil }, nil) - fanoutStorage, storageCleanup := setupStorages(logger, session, cfg) + // TODO(r): clusters + var clusters local.Clusters + + fanoutStorage, storageCleanup := setupStorages(logger, clusters, cfg) defer storageCleanup() clusterClient := m3dbcluster.NewAsyncClient(func() (clusterclient.Client, error) { return <-clusterClientCh, nil }, nil) + // TODO(r): config and options + downsampler, err := downsample.NewDownsampler(downsample.DownsamplingConfiguration{}, + downsample.DownsamplerOptions{}) + if err != nil { + logger.Fatal("unable to create downsampler", zap.Any("error", err)) + } + handler, err := httpd.NewHandler(fanoutStorage, executor.NewEngine(fanoutStorage), - clusterClient, cfg, scope) + downsampler, clusterClient, cfg, scope) if err != nil { logger.Fatal("unable to set up handlers", zap.Any("error", err)) } @@ -172,9 +183,9 @@ func Run(runOpts RunOptions) { } } -func setupStorages(logger *zap.Logger, session client.Session, cfg config.Configuration) (storage.Storage, func()) { +func setupStorages(logger *zap.Logger, clusters local.Clusters, cfg config.Configuration) (storage.Storage, func()) { cleanup := func() {} - localStorage := local.NewStorage(session, namespace) + localStorage := local.NewStorage(clusters) stores := []storage.Storage{localStorage} remoteEnabled := false if cfg.RPC != nil && cfg.RPC.Enabled { diff --git a/src/coordinator/storage/config.go b/src/coordinator/storage/config.go new file mode 100644 index 0000000000..15e1a96a96 --- /dev/null +++ b/src/coordinator/storage/config.go @@ -0,0 +1,50 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import "fmt" + +// IsValidMetricsType validates a stored metrics type. +func IsValidMetricsType(v MetricsType) error { + for _, valid := range validMetricsTypes { + if valid == v { + return nil + } + } + return fmt.Errorf("invalid stored metrics type '%v': should be one of %v", + v, validMetricsTypes) +} + +// UnmarshalYAML unmarshals a stored merics type. +func (v *MetricsType) UnmarshalYAML(unmarshal func(interface{}) error) error { + var str string + if err := unmarshal(&str); err != nil { + return err + } + for _, valid := range validMetricsTypes { + if str == valid.String() { + *v = valid + return nil + } + } + return fmt.Errorf("invalid MetricsType '%s' valid types are: %v", + str, validMetricsTypes) +} diff --git a/src/coordinator/storage/interface.go b/src/coordinator/storage/interface.go index 1654475fe3..185bd85c8c 100644 --- a/src/coordinator/storage/interface.go +++ b/src/coordinator/storage/interface.go @@ -98,6 +98,7 @@ type WriteQuery struct { Datapoints ts.Datapoints Unit xtime.Unit Annotation []byte + Attributes Attributes } func (q *WriteQuery) String() string { @@ -132,3 +133,38 @@ type QueryResult struct { type BlockResult struct { Blocks []Block } + +// MetricsType is a type of stored metrics. +type MetricsType uint + +const ( + // UnaggregatedMetricsType is an unaggregated metrics type. + UnaggregatedMetricsType MetricsType = iota + // AggregatedMetricsType is an aggregated metrics type. + AggregatedMetricsType +) + +var ( + validMetricsTypes = []MetricsType{ + UnaggregatedMetricsType, + AggregatedMetricsType, + } +) + +func (t MetricsType) String() string { + switch t { + case UnaggregatedMetricsType: + return "unaggregated" + case AggregatedMetricsType: + return "aggregated" + default: + return "unknown" + } +} + +// Attributes is a set of stored metrics attributes. +type Attributes struct { + MetricsType MetricsType + Retention time.Duration + Resolution time.Duration +} diff --git a/src/coordinator/storage/local/cluster.go b/src/coordinator/storage/local/cluster.go new file mode 100644 index 0000000000..097fa5df57 --- /dev/null +++ b/src/coordinator/storage/local/cluster.go @@ -0,0 +1,54 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package local + +import ( + "time" + + "github.com/m3db/m3db/src/coordinator/storage" + "github.com/m3db/m3db/src/dbnode/client" + "github.com/m3db/m3x/ident" +) + +// Clusters is a flattened collection of local storage clusters and namespaces. +type Clusters interface { + ClusterNamespaces() ([]ClusterNamespace, error) + + UnaggregatedClusterNamespace() (ClusterNamespace, error) + + AggregatedClusterNamespace( + params AggregatedClusterNamespaceParams, + ) (ClusterNamespace, error) +} + +// AggregatedClusterNamespaceParams is a set of parameters required to resolve +// an aggregated cluster namespace. +type AggregatedClusterNamespaceParams struct { + Retention time.Duration + Resolution time.Duration +} + +// ClusterNamespace is a local storage cluster namespace. +type ClusterNamespace interface { + NamespaceID() ident.ID + Attributes() storage.Attributes + Session() (client.Session, error) +} diff --git a/src/coordinator/storage/local/config.go b/src/coordinator/storage/local/config.go new file mode 100644 index 0000000000..383070a9c5 --- /dev/null +++ b/src/coordinator/storage/local/config.go @@ -0,0 +1,45 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package local + +import ( + "time" + + "github.com/m3db/m3db/src/coordinator/storage" + "github.com/m3db/m3db/src/dbnode/client" +) + +// ClustersStaticConfiguration is a set of static cluster configurations. +type ClustersStaticConfiguration []ClusterStaticConfiguration + +// ClusterStaticConfiguration is a static cluster configuration. +type ClusterStaticConfiguration struct { + Client client.Configuration `yaml:"client"` + Namespaces ClusterStaticNamespacesConfiguration `yaml:"namespaces"` +} + +// ClusterStaticNamespacesConfiguration describes the namespaces in a static +// cluster. +type ClusterStaticNamespacesConfiguration struct { + StorageMetricsType storage.MetricsType `yaml:"storageMetricsType"` + Retention time.Duration `yaml:"retention" validate:"nonzero"` + Resolution time.Duration `yaml:"resolution" validate:"min=0"` +} diff --git a/src/coordinator/storage/local/storage.go b/src/coordinator/storage/local/storage.go index a160e92b41..13aa2cd6bb 100644 --- a/src/coordinator/storage/local/storage.go +++ b/src/coordinator/storage/local/storage.go @@ -22,6 +22,9 @@ package local import ( "context" + goerrors "errors" + "fmt" + "sync" "time" "github.com/m3db/m3db/src/coordinator/errors" @@ -29,7 +32,8 @@ import ( "github.com/m3db/m3db/src/coordinator/storage" "github.com/m3db/m3db/src/coordinator/ts" "github.com/m3db/m3db/src/coordinator/util/execution" - "github.com/m3db/m3db/src/dbnode/client" + "github.com/m3db/m3db/src/dbnode/storage/index" + xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" xtime "github.com/m3db/m3x/time" ) @@ -38,14 +42,17 @@ const ( initRawFetchAllocSize = 32 ) +var ( + errNoLocalClustersFulfillsQuery = goerrors.New("no clusters can fulfill query") +) + type localStorage struct { - session client.Session - namespace ident.ID + clusters Clusters } // NewStorage creates a new local Storage instance. -func NewStorage(session client.Session, namespace string) storage.Storage { - return &localStorage{session: session, namespace: ident.StringID(namespace)} +func NewStorage(clusters Clusters) storage.Storage { + return &localStorage{clusters: clusters} } func (s *localStorage) Fetch(ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions) (*storage.FetchResult, error) { @@ -64,17 +71,78 @@ func (s *localStorage) Fetch(ctx context.Context, query *storage.FetchQuery, opt } opts := storage.FetchOptionsToM3Options(options, query) + + namespaces, err := s.clusters.ClusterNamespaces() + if err != nil { + return nil, err + } + + // NB(r): Since we don't use a single index we fan out to each + // cluster that can completely fulfill this range and then prefer the + // highest resolution (most fine grained) results. + // This needs to be optimized, however this is a start. + var ( + now = time.Now() + fetches = 0 + result multiFetchResult + wg sync.WaitGroup + ) + for _, namespace := range namespaces { + namespace := namespace // Capture var + + clusterStart := now.Add(-1 * namespace.Attributes().Retention) + + // Only include if cluster can completely fulfill the range + if clusterStart.After(query.Start) { + continue + } + + fetches++ + + wg.Add(1) + go func() { + r, err := s.fetch(namespace, m3query, opts) + result.add(namespace.Attributes(), r, err) + wg.Done() + }() + } + + if fetches == 0 { + return nil, errNoLocalClustersFulfillsQuery + } + + wg.Wait() + if err := result.err.FinalError(); err != nil { + return nil, err + } + return result.result, nil +} + +func (s *localStorage) fetch( + namespace ClusterNamespace, + query index.Query, + opts index.QueryOptions, +) (*storage.FetchResult, error) { + session, err := namespace.Session() + if err != nil { + return nil, err + } + + namespaceID := namespace.NamespaceID() + // TODO (nikunj): Handle second return param - iters, _, err := s.session.FetchTagged(s.namespace, m3query, opts) + iters, _, err := session.FetchTagged(namespaceID, query, opts) if err != nil { return nil, err } defer iters.Close() + // NB(r): Decompressing values should really happen concurrently instead of + // sequentially like below. seriesList := make([]*ts.Series, iters.Len()) for i, iter := range iters.Iters() { - metric, err := storage.FromM3IdentToMetric(s.namespace, iter.ID(), iter.Tags()) + metric, err := storage.FromM3IdentToMetric(namespaceID, iter.ID(), iter.Tags()) if err != nil { return nil, err } @@ -110,8 +178,62 @@ func (s *localStorage) FetchTags(ctx context.Context, query *storage.FetchQuery, } opts := storage.FetchOptionsToM3Options(options, query) + + namespaces, err := s.clusters.ClusterNamespaces() + if err != nil { + return nil, err + } + + var ( + now = time.Now() + fetches = 0 + result multiFetchTagsResult + wg sync.WaitGroup + ) + for _, namespace := range namespaces { + namespace := namespace // Capture var + + clusterStart := now.Add(-1 * namespace.Attributes().Retention) + + // Only include if cluster can completely fulfill the range + if clusterStart.After(query.Start) { + continue + } + + fetches++ + + wg.Add(1) + go func() { + result.add(s.fetchTags(namespace, m3query, opts)) + wg.Done() + }() + } + + if fetches == 0 { + return nil, errNoLocalClustersFulfillsQuery + } + + wg.Wait() + if err := result.err.FinalError(); err != nil { + return nil, err + } + return result.result, nil +} + +func (s *localStorage) fetchTags( + namespace ClusterNamespace, + query index.Query, + opts index.QueryOptions, +) (*storage.SearchResults, error) { + session, err := namespace.Session() + if err != nil { + return nil, err + } + + namespaceID := namespace.NamespaceID() + // TODO (juchan): Handle second return param - iter, _, err := s.session.FetchTaggedIDs(s.namespace, m3query, opts) + iter, _, err := session.FetchTaggedIDs(namespaceID, query, opts) if err != nil { return nil, err } @@ -150,6 +272,7 @@ func (s *localStorage) Write(ctx context.Context, query *storage.WriteQuery) err unit: query.Unit, id: id, tagIterator: storage.TagsToIdentTagIterator(query.Tags), + attributes: query.Attributes, } requests := make([]execution.Request, len(query.Datapoints)) @@ -168,11 +291,45 @@ func (s *localStorage) FetchBlocks( return storage.BlockResult{}, errors.ErrNotImplemented } +func (s *localStorage) Close() error { + return nil +} + func (w *writeRequest) Process(ctx context.Context) error { common := w.writeRequestCommon store := common.store id := ident.StringID(common.id) - return store.session.WriteTagged(store.namespace, id, common.tagIterator, w.timestamp, w.value, common.unit, common.annotation) + + var ( + namespace ClusterNamespace + err error + ) + switch common.attributes.MetricsType { + case storage.UnaggregatedMetricsType: + namespace, err = store.clusters.UnaggregatedClusterNamespace() + case storage.AggregatedMetricsType: + params := AggregatedClusterNamespaceParams{ + Retention: common.attributes.Retention, + Resolution: common.attributes.Resolution, + } + namespace, err = store.clusters.AggregatedClusterNamespace(params) + default: + metricsType := common.attributes.MetricsType + err = fmt.Errorf("invalid write request metrics type: %s (%d)", + metricsType.String(), uint(metricsType)) + } + if err != nil { + return err + } + + session, err := namespace.Session() + if err != nil { + return err + } + + namespaceID := namespace.NamespaceID() + + return session.WriteTagged(namespaceID, id, common.tagIterator, w.timestamp, w.value, common.unit, common.annotation) } type writeRequestCommon struct { @@ -181,6 +338,7 @@ type writeRequestCommon struct { unit xtime.Unit id string tagIterator ident.TagIterator + attributes storage.Attributes } type writeRequest struct { @@ -197,7 +355,119 @@ func newWriteRequest(writeRequestCommon *writeRequestCommon, timestamp time.Time } } -func (s *localStorage) Close() error { - s.namespace.Finalize() - return nil +type multiFetchResult struct { + sync.Mutex + result *storage.FetchResult + err xerrors.MultiError + dedupeFirstAttrs storage.Attributes + dedupeMap map[string]multiFetchResultSeries +} + +type multiFetchResultSeries struct { + idx int + attrs storage.Attributes +} + +func (r *multiFetchResult) add( + attrs storage.Attributes, + result *storage.FetchResult, + err error, +) { + r.Lock() + defer r.Unlock() + + if err != nil { + r.err = r.err.Add(err) + return + } + + if r.result == nil { + r.result = result + r.dedupeFirstAttrs = attrs + return + } + + r.result.HasNext = r.result.HasNext && result.HasNext + r.result.LocalOnly = r.result.LocalOnly && result.LocalOnly + + // Need to dedupe + if r.dedupeMap == nil { + r.dedupeMap = make(map[string]multiFetchResultSeries, len(r.result.SeriesList)) + for idx, s := range r.result.SeriesList { + r.dedupeMap[s.Name()] = multiFetchResultSeries{ + idx: idx, + attrs: r.dedupeFirstAttrs, + } + } + } + + for _, s := range result.SeriesList { + id := s.Name() + existing, exists := r.dedupeMap[id] + if exists && existing.attrs.Resolution < attrs.Resolution { + // Already exists and resolution is already more finer grained + continue + } + + // Does not exist already or more finer grained, add result + var idx int + if !exists { + idx = len(r.result.SeriesList) + r.result.SeriesList = append(r.result.SeriesList, s) + } else { + idx = existing.idx + r.result.SeriesList[idx] = s + } + + r.dedupeMap[id] = multiFetchResultSeries{ + idx: idx, + attrs: attrs, + } + } +} + +type multiFetchTagsResult struct { + sync.Mutex + result *storage.SearchResults + err xerrors.MultiError + dedupeMap map[string]struct{} +} + +func (r *multiFetchTagsResult) add( + result *storage.SearchResults, + err error, +) { + r.Lock() + defer r.Unlock() + + if err != nil { + r.err = r.err.Add(err) + return + } + + if r.result == nil { + r.result = result + return + } + + // Need to dedupe + if r.dedupeMap == nil { + r.dedupeMap = make(map[string]struct{}, len(r.result.Metrics)) + for _, s := range r.result.Metrics { + r.dedupeMap[s.ID] = struct{}{} + } + } + + for _, s := range result.Metrics { + id := s.ID + _, exists := r.dedupeMap[id] + if exists { + // Already exists + continue + } + + // Does not exist already, add result + r.result.Metrics = append(r.result.Metrics, s) + r.dedupeMap[id] = struct{}{} + } } From df2d6ed1545ea21bdfd3f20285c41f70b19029dd Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 2 Jul 2018 18:10:31 -0400 Subject: [PATCH 2/3] Address feedback --- .../m3coordinator/downsample/downsample_id.go | 41 +++++++++---------- .../m3coordinator/downsample/downsampler.go | 8 +++- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsample_id.go b/src/cmd/services/m3coordinator/downsample/downsample_id.go index df334a39f0..e2477e87bf 100644 --- a/src/cmd/services/m3coordinator/downsample/downsample_id.go +++ b/src/cmd/services/m3coordinator/downsample/downsample_id.go @@ -162,6 +162,11 @@ func isRollupID( return false } +// rollupIDProvider is a constructor for rollup IDs, it can be pooled to avoid +// requiring allocation every time we need to construct a rollup ID. +// When used as a ident.TagIterator for the call to serialize.TagEncoder Encode +// method, it will return the rollup tag in the correct alphabetical order +// when progressing through the existing tags. type rollupIDProvider struct { index int tagPairs []id.TagPair @@ -262,8 +267,7 @@ func (p *rollupIDProvider) Remaining() int { } func (p *rollupIDProvider) Duplicate() ident.TagIterator { - duplicate := new(rollupIDProvider) - *duplicate = *p + duplicate := p.pool.Get() duplicate.reset(p.tagPairs) return duplicate } @@ -297,34 +301,27 @@ func (p *rollupIDProviderPool) Put(v *rollupIDProvider) { p.pool.Put(v) } -func encodedTagsNameAndTags( +func resolveEncodedTagsNameTag( id []byte, iterPool *encodedTagsIteratorPool, -) ([]byte, []byte, error) { +) ([]byte, error) { // ID is always the encoded tags for downsampling IDs - metricTags := id - iter := iterPool.Get() - iter.Reset(metricTags) + iter.Reset(id) defer iter.Close() - var metricName []byte - for iter.Next() { - name, value := iter.Current() - if bytes.Equal(metricNameTagName, name) { - metricName = value - break - } - } - - if err := iter.Err(); err != nil { - return nil, nil, err + value, ok := iter.TagValue(metricNameTagName) + if !ok { + // No name was found in encoded tags + return nil, errNoMetricNameTag } - if metricName == nil { - // No name was found in encoded tags - return nil, nil, errNoMetricNameTag + idx := bytes.Index(id, value) + if idx == -1 { + return nil, fmt.Errorf( + "resolved metric name tag value not found in ID: %v", value) } - return metricName, metricTags, nil + // Return original reference to avoid needing to return a copy + return id[idx : idx+len(value)], nil } diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index a4b38a431e..f80b689705 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -202,7 +202,13 @@ func (o DownsamplerOptions) newAggregator() (newAggregatorResult, error) { tagsFilterOptions := filters.TagsFilterOptions{ NameTagKey: metricNameTagName, NameAndTagsFn: func(id []byte) ([]byte, []byte, error) { - return encodedTagsNameAndTags(id, sortedTagIteratorPool) + name, err := resolveEncodedTagsNameTag(id, sortedTagIteratorPool) + if err != nil { + return nil, nil, err + } + // ID is always the encoded tags for IDs in the downsampler + tags := id + return name, tags, nil }, SortedTagIteratorFn: sortedTagIteratorFn, } From 880e0876452f65742234ce657631544ba358a9ba Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 3 Jul 2018 11:09:50 -0400 Subject: [PATCH 3/3] Rename downsampled ID logic file and fix some lint issues --- src/cmd/services/m3coordinator/downsample/downsampler.go | 7 +------ .../m3coordinator/downsample/{downsample_id.go => ids.go} | 0 src/coordinator/api/v1/handler/prometheus/remote/write.go | 4 +++- 3 files changed, 4 insertions(+), 7 deletions(-) rename src/cmd/services/m3coordinator/downsample/{downsample_id.go => ids.go} (100%) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index f80b689705..8f1af4e3a7 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -177,7 +177,7 @@ func (o DownsamplerOptions) newAggregator() (newAggregatorResult, error) { instrumentOpts = o.InstrumentOptions ) if o.StorageFlushConcurrency > 0 { - storageFlushConcurrency = storageFlushConcurrency + storageFlushConcurrency = o.StorageFlushConcurrency } // Configure rules options. @@ -374,11 +374,6 @@ func (o DownsamplerOptions) newAggregator() (newAggregatorResult, error) { }, nil } -type retentionResolution struct { - retention time.Duration - resolution time.Duration -} - type downsamplerFlushHandler struct { sync.RWMutex storage storage.Storage diff --git a/src/cmd/services/m3coordinator/downsample/downsample_id.go b/src/cmd/services/m3coordinator/downsample/ids.go similarity index 100% rename from src/cmd/services/m3coordinator/downsample/downsample_id.go rename to src/cmd/services/m3coordinator/downsample/ids.go diff --git a/src/coordinator/api/v1/handler/prometheus/remote/write.go b/src/coordinator/api/v1/handler/prometheus/remote/write.go index e6df5a43e1..cdc5c5f613 100644 --- a/src/coordinator/api/v1/handler/prometheus/remote/write.go +++ b/src/coordinator/api/v1/handler/prometheus/remote/write.go @@ -164,7 +164,9 @@ func (h *PromWriteHandler) write(ctx context.Context, r *prompb.WriteRequest) er requests := make([]execution.Request, 0, len(r.Timeseries)) for _, t := range r.Timeseries { write := storage.PromWriteTSToM3(t) - write.Attributes.MetricsType = storage.UnaggregatedMetricsType + write.Attributes = storage.Attributes{ + MetricsType: storage.UnaggregatedMetricsType, + } request := newLocalWriteRequest(write, h.store) requests = append(requests, request) }