From 85c233c142108dbf6af00b04e4889972e66d51b4 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 8 Oct 2018 14:24:46 -0400 Subject: [PATCH 1/6] Add a storage based ingester to m3coordinator --- glide.lock | 6 +- glide.yaml | 3 + .../development/m3_stack/m3coordinator.yml | 15 ++ .../services/m3coordinator/ingest/config.go | 65 +++++++ .../services/m3coordinator/ingest/ingest.go | 175 ++++++++++++++++++ .../m3coordinator/ingest/ingest_test.go | 126 +++++++++++++ .../m3coordinator/server/m3msg/handler.go | 4 +- .../m3coordinator/server/m3msg/types.go | 4 +- src/cmd/services/m3query/config/config.go | 14 ++ src/query/server/server.go | 15 ++ 10 files changed, 420 insertions(+), 7 deletions(-) create mode 100644 src/cmd/services/m3coordinator/ingest/config.go create mode 100644 src/cmd/services/m3coordinator/ingest/ingest.go create mode 100644 src/cmd/services/m3coordinator/ingest/ingest_test.go diff --git a/glide.lock b/glide.lock index 002dd5f7c9..a0a2e41bbc 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: c7d6edc7885c08b5c7ab435fc11c874bce9200ee5187e628778a0c7b2e028d30 -updated: 2018-10-08T12:03:37.468013-04:00 +hash: ac83014457f1c39657823b948fa85942f444e3cfaf08717d7d4794b92a612f3a +updated: 2018-10-08T12:41:58.07315-04:00 imports: - name: github.com/apache/thrift version: c2fb1c4e8c931d22617bebb0bf388cb4d5e6fcff @@ -295,7 +295,7 @@ imports: - transformation - x/bytes - name: github.com/m3db/m3msg - version: 4680d9b45286826f87b134a4559b11d795786eaf + version: 4851e2719e06b15f1fc247e1d00339192963990e subpackages: - consumer - generated/proto/msgpb diff --git a/glide.yaml b/glide.yaml index 0d220ca313..6c58756ff6 100644 --- a/glide.yaml +++ b/glide.yaml @@ -31,6 +31,9 @@ import: - package: github.com/m3db/m3ctl version: acc762bfdd42ecb192d34e48fa7ca1fd7ee088ac + - package: github.com/m3db/m3msg + version: 4851e2719e06b15f1fc247e1d00339192963990e + - package: github.com/m3db/bitset version: 07973db6b78acb62ac207d0538055e874b49d90d diff --git a/scripts/development/m3_stack/m3coordinator.yml b/scripts/development/m3_stack/m3coordinator.yml index f24643cdf3..bc79f8b180 100644 --- a/scripts/development/m3_stack/m3coordinator.yml +++ b/scripts/development/m3_stack/m3coordinator.yml @@ -45,3 +45,18 @@ clusters: jitter: true backgroundHealthCheckFailLimit: 4 backgroundHealthCheckFailThrottleFactor: 0.5 + +ingest: + ingester: + workerPoolSize: 100 + opPool: + size: 100 + retry: + maxRetries: 3 + jitter: true + m3msg: + server: + listenAddress: 0.0.0.0:7507 + retry: + maxBackoff: 10s + jitter: true diff --git a/src/cmd/services/m3coordinator/ingest/config.go b/src/cmd/services/m3coordinator/ingest/config.go new file mode 100644 index 0000000000..fd974d2bb6 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/config.go @@ -0,0 +1,65 @@ +package ingest + +import ( + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/x/serialize" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" + "github.com/m3db/m3x/retry" + xsync "github.com/m3db/m3x/sync" +) + +// Configuration configs the ingester. +type Configuration struct { + WorkerPoolSize int `yaml:"workerPoolSize"` + OpPool pool.ObjectPoolConfiguration `yaml:"opPool"` + Retry retry.Configuration `yaml:"retry"` +} + +// NewIngester creates an ingester with an appender. +func (cfg Configuration) NewIngester( + appender storage.Appender, + instrumentOptions instrument.Options, +) (*Ingester, error) { + opts, err := cfg.newOptions(appender, instrumentOptions) + if err != nil { + return nil, err + } + return NewIngester(opts), nil +} + +func (cfg Configuration) newOptions( + appender storage.Appender, + instrumentOptions instrument.Options, +) (*Options, error) { + scope := instrumentOptions.MetricsScope().Tagged( + map[string]string{"component": "ingester"}, + ) + workers, err := xsync.NewPooledWorkerPool( + cfg.WorkerPoolSize, + xsync.NewPooledWorkerPoolOptions(). + SetInstrumentOptions(instrumentOptions), + ) + if err != nil { + return nil, err + } + + workers.Init() + tagDecoderPool := serialize.NewTagDecoderPool( + serialize.NewTagDecoderOptions(), + pool.NewObjectPoolOptions(). + SetInstrumentOptions(instrumentOptions. + SetMetricsScope(instrumentOptions.MetricsScope(). + SubScope("tag-decoder-pool"))), + ) + tagDecoderPool.Init() + opts := Options{ + Appender: appender, + Workers: workers, + PoolOptions: cfg.OpPool.NewObjectPoolOptions(instrumentOptions), + TagDecoderPool: tagDecoderPool, + RetryOptions: cfg.Retry.NewOptions(scope), + InstrumentOptions: instrumentOptions, + } + return &opts, nil +} diff --git a/src/cmd/services/m3coordinator/ingest/ingest.go b/src/cmd/services/m3coordinator/ingest/ingest.go new file mode 100644 index 0000000000..f9b0d30ab2 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/ingest.go @@ -0,0 +1,175 @@ +package ingest + +import ( + "context" + "time" + + "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/x/serialize" + "github.com/m3db/m3metrics/metric/id" + "github.com/m3db/m3metrics/policy" + xerrors "github.com/m3db/m3x/errors" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" + "github.com/m3db/m3x/retry" + xsync "github.com/m3db/m3x/sync" + + "github.com/uber-go/tally" +) + +// Options configures the ingester. +type Options struct { + Appender storage.Appender + Workers xsync.PooledWorkerPool + PoolOptions pool.ObjectPoolOptions + TagDecoderPool serialize.TagDecoderPool + RetryOptions retry.Options + InstrumentOptions instrument.Options +} + +type ingestMetrics struct { + ingestError tally.Counter + ingestSuccess tally.Counter +} + +func newIngestMetrics(scope tally.Scope) ingestMetrics { + return ingestMetrics{ + ingestError: scope.Counter("ingest-error"), + ingestSuccess: scope.Counter("ingest-success"), + } +} + +// Ingester ingests metrics with a worker pool. +type Ingester struct { + workers xsync.PooledWorkerPool + p pool.ObjectPool +} + +// NewIngester creates an ingester. +func NewIngester( + opts *Options, +) *Ingester { + retrier := retry.NewRetrier(opts.RetryOptions) + m := newIngestMetrics(opts.InstrumentOptions.MetricsScope()) + p := pool.NewObjectPool(opts.PoolOptions) + p.Init( + func() interface{} { + // NB: we don't need a pool for the tag decoder since the ops are + // pooled, but currently this is the only way to get tag decoder. + tagDecoder := opts.TagDecoderPool.Get() + op := ingestOp{ + s: opts.Appender, + r: retrier, + it: serialize.NewMetricTagsIterator(tagDecoder, nil), + p: p, + m: m, + } + op.attemptFn = op.attempt + op.ingestFn = op.ingest + return &op + }, + ) + return &Ingester{ + workers: opts.Workers, + p: p, + } +} + +// Ingest ingests a metric asynchronously with callback. +func (i *Ingester) Ingest( + id []byte, + metricTimeNanos int64, + value float64, + sp policy.StoragePolicy, + callback *m3msg.RefCountedCallback, +) { + op := i.p.Get().(*ingestOp) + op.id = id + op.metricTimeNanos = metricTimeNanos + op.value = value + op.sp = sp + op.callback = callback + i.workers.Go(op.ingestFn) +} + +type ingestOp struct { + s storage.Appender + r retry.Retrier + it id.SortedTagIterator + p pool.ObjectPool + m ingestMetrics + attemptFn retry.Fn + ingestFn func() + + id []byte + metricTimeNanos int64 + value float64 + sp policy.StoragePolicy + callback *m3msg.RefCountedCallback + q storage.WriteQuery +} + +func (op *ingestOp) ingest() { + if err := op.resetWriteQuery(); err != nil { + op.m.ingestError.Inc(1) + op.callback.Callback(m3msg.OnRetriableError) + op.p.Put(op) + return + } + if err := op.r.Attempt(op.attemptFn); err != nil { + if xerrors.IsNonRetryableError(err) { + op.callback.Callback(m3msg.OnNonRetriableError) + } else { + op.callback.Callback(m3msg.OnRetriableError) + } + op.m.ingestError.Inc(1) + op.p.Put(op) + return + } + op.m.ingestSuccess.Inc(1) + op.callback.Callback(m3msg.OnSuccess) + op.p.Put(op) +} + +func (op *ingestOp) attempt() error { + return op.s.Write( + // NB: No timeout is needed for this as the m3db client has a timeout + // configured to it. + context.Background(), + &op.q, + ) +} + +func (op *ingestOp) resetWriteQuery() error { + if err := op.resetTags(); err != nil { + return err + } + op.resetDataPoints() + op.q.Raw = string(op.id) + op.q.Unit = op.sp.Resolution().Precision + op.q.Attributes.MetricsType = storage.AggregatedMetricsType + op.q.Attributes.Resolution = op.sp.Resolution().Window + op.q.Attributes.Retention = op.sp.Retention().Duration() + return nil +} + +func (op *ingestOp) resetTags() error { + op.it.Reset(op.id) + op.q.Tags = op.q.Tags[:0] + for op.it.Next() { + name, value := op.it.Current() + op.q.Tags = append(op.q.Tags, models.Tag{Name: name, Value: value}) + } + return op.it.Err() +} + +func (op *ingestOp) resetDataPoints() { + if len(op.q.Datapoints) != 1 { + op.q.Datapoints = make(ts.Datapoints, 1) + } + op.q.Datapoints[0].Timestamp = time.Unix(0, op.metricTimeNanos) + op.q.Datapoints[0].Value = op.value +} diff --git a/src/cmd/services/m3coordinator/ingest/ingest_test.go b/src/cmd/services/m3coordinator/ingest/ingest_test.go new file mode 100644 index 0000000000..813d64ff1b --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/ingest_test.go @@ -0,0 +1,126 @@ +package ingest + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/x/serialize" + "github.com/m3db/m3metrics/policy" + "github.com/m3db/m3msg/consumer" + "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +func TestIngest(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cfg := Configuration{ + WorkerPoolSize: 2, + OpPool: pool.ObjectPoolConfiguration{ + Size: 1, + }, + } + appender := &mockAppender{} + ingester, err := cfg.NewIngester(appender, instrument.NewOptions()) + require.NoError(t, err) + + id := newTestID(t, "__name__", "foo", "app", "bar") + timestamp := int64(1234) + val := float64(1) + sp := policy.MustParseStoragePolicy("1m:40d") + m := consumer.NewMockMessage(ctrl) + callback := m3msg.NewRefCountedCallback(m) + callback.IncRef() + + m.EXPECT().Ack() + ingester.Ingest(id, timestamp, val, sp, callback) + + for appender.cnt() != 1 { + time.Sleep(100 * time.Millisecond) + } + + require.Equal(t, + storage.WriteQuery{ + Annotation: nil, + Attributes: storage.Attributes{ + MetricsType: storage.AggregatedMetricsType, + Resolution: time.Minute, + Retention: 40 * 24 * time.Hour, + }, + Datapoints: ts.Datapoints{ + ts.Datapoint{ + Timestamp: time.Unix(0, timestamp), + Value: val, + }, + }, + Raw: string(id), + Tags: models.Tags{ + models.Tag{ + Name: []byte("__name__"), + Value: []byte("foo"), + }, + models.Tag{ + Name: []byte("app"), + Value: []byte("bar"), + }, + }, + Unit: sp.Resolution().Precision, + }, + *appender.received[0], + ) + + // Make sure the op is put back to pool. + op := ingester.p.Get().(*ingestOp) + require.Equal(t, id, op.id) +} + +type mockAppender struct { + sync.RWMutex + + expectErr error + received []*storage.WriteQuery +} + +func (m *mockAppender) Write(ctx context.Context, query *storage.WriteQuery) error { + m.Lock() + defer m.Unlock() + + if m.expectErr != nil { + return m.expectErr + } + m.received = append(m.received, query) + return nil +} + +func (m *mockAppender) cnt() int { + m.Lock() + defer m.Unlock() + + return len(m.received) +} + +func newTestID(t *testing.T, tags ...string) []byte { + tagEncoderPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), + pool.NewObjectPoolOptions().SetSize(1)) + tagEncoderPool.Init() + + tagsIter := ident.MustNewTagStringsIterator(tags...) + tagEncoder := tagEncoderPool.Get() + err := tagEncoder.Encode(tagsIter) + require.NoError(t, err) + + data, ok := tagEncoder.Data() + require.True(t, ok) + return data.Bytes() +} diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler.go b/src/cmd/services/m3coordinator/server/m3msg/handler.go index 8835f849d4..3795245468 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/handler.go @@ -126,7 +126,7 @@ func (h *perConsumerHandler) processMessage( msg consumer.Message, ) { r := NewRefCountedCallback(msg) - r.incRef() + r.IncRef() // Decode the bytes in the message. h.r.Reset(msg.Bytes()) @@ -144,7 +144,7 @@ func (h *perConsumerHandler) processMessage( // TODO: Consider incrementing a wait group for each write and wait on // shut down to reduce the number of messages being retried by m3msg. - r.incRef() + r.IncRef() h.writeFn(m.ID, m.TimeNanos, m.Value, sp, r) } r.decRef() diff --git a/src/cmd/services/m3coordinator/server/m3msg/types.go b/src/cmd/services/m3coordinator/server/m3msg/types.go index a89db299f2..0be42164a7 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/types.go +++ b/src/cmd/services/m3coordinator/server/m3msg/types.go @@ -64,8 +64,8 @@ func NewRefCountedCallback(msg consumer.Message) *RefCountedCallback { } } -// incRef increments the ref count. -func (r *RefCountedCallback) incRef() { +// IncRef increments the ref count. +func (r *RefCountedCallback) IncRef() { atomic.AddInt32(&r.ref, 1) } diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index b6a376c51d..facaa0c385 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -23,6 +23,8 @@ package config import ( "time" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" + "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" "github.com/m3db/m3/src/query/storage/m3" etcdclient "github.com/m3db/m3cluster/client/etcd" xconfig "github.com/m3db/m3x/config" @@ -71,6 +73,18 @@ type Configuration struct { // WriteWorkerPool is the worker pool policy for write requests. WriteWorkerPool xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"` + + // Ingest is the ingest server. + Ingest *IngestConfiguration `yaml:"ingest"` +} + +// IngestConfiguration is the configuration for ingestion server. +type IngestConfiguration struct { + // Ingester is the configuration for storage based ingester. + Ingester ingest.Configuration `yaml:"ingester"` + + // M3msg is the configuration for m3msg server. + M3msg m3msg.Configuration `yaml:"m3msg"` } // LocalConfiguration is the local embedded configuration if running diff --git a/src/query/server/server.go b/src/query/server/server.go index 63b805c2a1..bae05ad01f 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -213,6 +213,21 @@ func Run(runOpts RunOptions) { } }() + if cfg.Ingest != nil { + ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions) + if err != nil { + logger.Fatal("unable to create ingester", zap.Error(err)) + } + server, err := cfg.Ingest.M3msg.NewServer(ingester.Ingest, instrumentOptions) + if err != nil { + logger.Fatal("unable to create m3msg server", zap.Error(err)) + } + if err := server.ListenAndServe(); err != nil { + logger.Fatal("unable to listen on ingest server", zap.Error(err)) + } + defer server.Close() + } + var interruptCh <-chan error = make(chan error) if runOpts.InterruptCh != nil { interruptCh = runOpts.InterruptCh From 4fc8a72234452756ac435d15d0dc6355e146dab8 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 8 Oct 2018 15:57:49 -0400 Subject: [PATCH 2/6] add license --- .../services/m3coordinator/ingest/config.go | 20 +++++++++++++++++++ .../services/m3coordinator/ingest/ingest.go | 20 +++++++++++++++++++ .../m3coordinator/ingest/ingest_test.go | 20 +++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/src/cmd/services/m3coordinator/ingest/config.go b/src/cmd/services/m3coordinator/ingest/config.go index fd974d2bb6..e5c6ccce27 100644 --- a/src/cmd/services/m3coordinator/ingest/config.go +++ b/src/cmd/services/m3coordinator/ingest/config.go @@ -1,3 +1,23 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package ingest import ( diff --git a/src/cmd/services/m3coordinator/ingest/ingest.go b/src/cmd/services/m3coordinator/ingest/ingest.go index f9b0d30ab2..9803ef0ea6 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/ingest.go @@ -1,3 +1,23 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package ingest import ( diff --git a/src/cmd/services/m3coordinator/ingest/ingest_test.go b/src/cmd/services/m3coordinator/ingest/ingest_test.go index 813d64ff1b..4c0804028c 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/ingest_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package ingest import ( From 037060233be1e7db8ebc3edd49c7b9bf769916c4 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 8 Oct 2018 17:10:58 -0400 Subject: [PATCH 3/6] Address comments --- .../m3coordinator/downsample/flush_handler.go | 3 +- .../services/m3coordinator/ingest/config.go | 2 +- .../services/m3coordinator/ingest/ingest.go | 37 ++++++++++--------- .../m3coordinator/ingest/ingest_test.go | 9 +++-- .../m3coordinator/server/m3msg/handler.go | 3 +- .../server/m3msg/handler_test.go | 5 ++- .../m3coordinator/server/m3msg/types.go | 3 +- src/cmd/services/m3query/config/config.go | 4 +- src/query/server/server.go | 2 +- 9 files changed, 37 insertions(+), 31 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/flush_handler.go b/src/cmd/services/m3coordinator/downsample/flush_handler.go index 27f2a7cd5d..3d48e28c9e 100644 --- a/src/cmd/services/m3coordinator/downsample/flush_handler.go +++ b/src/cmd/services/m3coordinator/downsample/flush_handler.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3metrics/metric/aggregated" "github.com/m3db/m3x/instrument" xsync "github.com/m3db/m3x/sync" + xtime "github.com/m3db/m3x/time" "github.com/uber-go/tally" ) @@ -142,7 +143,7 @@ func (w *downsamplerFlushHandlerWriter) Write( Timestamp: time.Unix(0, mp.TimeNanos), Value: mp.Value, }}, - Unit: mp.StoragePolicy.Resolution().Precision, + Unit: xtime.Millisecond, Attributes: storage.Attributes{ MetricsType: storage.AggregatedMetricsType, Retention: mp.StoragePolicy.Retention().Duration(), diff --git a/src/cmd/services/m3coordinator/ingest/config.go b/src/cmd/services/m3coordinator/ingest/config.go index e5c6ccce27..b1e6e4650f 100644 --- a/src/cmd/services/m3coordinator/ingest/config.go +++ b/src/cmd/services/m3coordinator/ingest/config.go @@ -45,7 +45,7 @@ func (cfg Configuration) NewIngester( if err != nil { return nil, err } - return NewIngester(opts), nil + return NewIngester(*opts), nil } func (cfg Configuration) newOptions( diff --git a/src/cmd/services/m3coordinator/ingest/ingest.go b/src/cmd/services/m3coordinator/ingest/ingest.go index 9803ef0ea6..31b0a8e80a 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/ingest.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3x/pool" "github.com/m3db/m3x/retry" xsync "github.com/m3db/m3x/sync" + xtime "github.com/m3db/m3x/time" "github.com/uber-go/tally" ) @@ -70,7 +71,7 @@ type Ingester struct { // NewIngester creates an ingester. func NewIngester( - opts *Options, + opts Options, ) *Ingester { retrier := retry.NewRetrier(opts.RetryOptions) m := newIngestMetrics(opts.InstrumentOptions.MetricsScope()) @@ -86,6 +87,7 @@ func NewIngester( it: serialize.NewMetricTagsIterator(tagDecoder, nil), p: p, m: m, + c: context.TODO(), } op.attemptFn = op.attempt op.ingestFn = op.ingest @@ -101,14 +103,14 @@ func NewIngester( // Ingest ingests a metric asynchronously with callback. func (i *Ingester) Ingest( id []byte, - metricTimeNanos int64, + metricTime time.Time, value float64, sp policy.StoragePolicy, callback *m3msg.RefCountedCallback, ) { op := i.p.Get().(*ingestOp) op.id = id - op.metricTimeNanos = metricTimeNanos + op.metricTime = metricTime op.value = value op.sp = sp op.callback = callback @@ -121,15 +123,16 @@ type ingestOp struct { it id.SortedTagIterator p pool.ObjectPool m ingestMetrics + c context.Context attemptFn retry.Fn ingestFn func() - id []byte - metricTimeNanos int64 - value float64 - sp policy.StoragePolicy - callback *m3msg.RefCountedCallback - q storage.WriteQuery + id []byte + metricTime time.Time + value float64 + sp policy.StoragePolicy + callback *m3msg.RefCountedCallback + q storage.WriteQuery } func (op *ingestOp) ingest() { @@ -155,12 +158,7 @@ func (op *ingestOp) ingest() { } func (op *ingestOp) attempt() error { - return op.s.Write( - // NB: No timeout is needed for this as the m3db client has a timeout - // configured to it. - context.Background(), - &op.q, - ) + return op.s.Write(op.c, &op.q) } func (op *ingestOp) resetWriteQuery() error { @@ -169,7 +167,7 @@ func (op *ingestOp) resetWriteQuery() error { } op.resetDataPoints() op.q.Raw = string(op.id) - op.q.Unit = op.sp.Resolution().Precision + op.q.Unit = xtime.Millisecond op.q.Attributes.MetricsType = storage.AggregatedMetricsType op.q.Attributes.Resolution = op.sp.Resolution().Window op.q.Attributes.Retention = op.sp.Retention().Duration() @@ -181,7 +179,10 @@ func (op *ingestOp) resetTags() error { op.q.Tags = op.q.Tags[:0] for op.it.Next() { name, value := op.it.Current() - op.q.Tags = append(op.q.Tags, models.Tag{Name: name, Value: value}) + op.q.Tags = append(op.q.Tags, models.Tag{ + Name: append([]byte(nil), name...), + Value: append([]byte(nil), value...), + }) } return op.it.Err() } @@ -190,6 +191,6 @@ func (op *ingestOp) resetDataPoints() { if len(op.q.Datapoints) != 1 { op.q.Datapoints = make(ts.Datapoints, 1) } - op.q.Datapoints[0].Timestamp = time.Unix(0, op.metricTimeNanos) + op.q.Datapoints[0].Timestamp = op.metricTime op.q.Datapoints[0].Value = op.value } diff --git a/src/cmd/services/m3coordinator/ingest/ingest_test.go b/src/cmd/services/m3coordinator/ingest/ingest_test.go index 4c0804028c..ec0df67150 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/ingest_test.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" + xtime "github.com/m3db/m3x/time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -56,7 +57,7 @@ func TestIngest(t *testing.T) { require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") - timestamp := int64(1234) + metricTime := time.Unix(0, 1234) val := float64(1) sp := policy.MustParseStoragePolicy("1m:40d") m := consumer.NewMockMessage(ctrl) @@ -64,7 +65,7 @@ func TestIngest(t *testing.T) { callback.IncRef() m.EXPECT().Ack() - ingester.Ingest(id, timestamp, val, sp, callback) + ingester.Ingest(id, metricTime, val, sp, callback) for appender.cnt() != 1 { time.Sleep(100 * time.Millisecond) @@ -80,7 +81,7 @@ func TestIngest(t *testing.T) { }, Datapoints: ts.Datapoints{ ts.Datapoint{ - Timestamp: time.Unix(0, timestamp), + Timestamp: metricTime, Value: val, }, }, @@ -95,7 +96,7 @@ func TestIngest(t *testing.T) { Value: []byte("bar"), }, }, - Unit: sp.Resolution().Precision, + Unit: xtime.Millisecond, }, *appender.received[0], ) diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler.go b/src/cmd/services/m3coordinator/server/m3msg/handler.go index 3795245468..079ecbbc84 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/handler.go @@ -23,6 +23,7 @@ package m3msg import ( "bytes" "io" + "time" "github.com/m3db/m3metrics/encoding/msgpack" "github.com/m3db/m3msg/consumer" @@ -145,7 +146,7 @@ func (h *perConsumerHandler) processMessage( // TODO: Consider incrementing a wait group for each write and wait on // shut down to reduce the number of messages being retried by m3msg. r.IncRef() - h.writeFn(m.ID, m.TimeNanos, m.Value, sp, r) + h.writeFn(m.ID, time.Unix(0, m.TimeNanos), m.Value, sp, r) } r.decRef() if err := h.it.Err(); err != nil && err != io.EOF { diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/handler_test.go index 0fd2bc07b4..f6484dd286 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/handler_test.go @@ -24,6 +24,7 @@ import ( "net" "sync" "testing" + "time" "github.com/m3db/m3metrics/encoding/msgpack" "github.com/m3db/m3metrics/metric/aggregated" @@ -100,7 +101,7 @@ type mockWriter struct { func (m *mockWriter) write( name []byte, - metricTime int64, + metricTime time.Time, value float64, sp policy.StoragePolicy, callbackable *RefCountedCallback, @@ -127,7 +128,7 @@ func (m *mockWriter) ingested() int { type payload struct { id string - metricTime int64 + metricTime time.Time value float64 sp policy.StoragePolicy } diff --git a/src/cmd/services/m3coordinator/server/m3msg/types.go b/src/cmd/services/m3coordinator/server/m3msg/types.go index 0be42164a7..2d6cc6ca39 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/types.go +++ b/src/cmd/services/m3coordinator/server/m3msg/types.go @@ -22,6 +22,7 @@ package m3msg import ( "sync/atomic" + "time" "github.com/m3db/m3metrics/policy" "github.com/m3db/m3msg/consumer" @@ -30,7 +31,7 @@ import ( // WriteFn is the function that writes a metric. type WriteFn func( id []byte, - metricTimeNanos int64, + metricTime time.Time, value float64, sp policy.StoragePolicy, callback *RefCountedCallback, diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index facaa0c385..5f99a0959d 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -83,8 +83,8 @@ type IngestConfiguration struct { // Ingester is the configuration for storage based ingester. Ingester ingest.Configuration `yaml:"ingester"` - // M3msg is the configuration for m3msg server. - M3msg m3msg.Configuration `yaml:"m3msg"` + // M3Msg is the configuration for m3msg server. + M3Msg m3msg.Configuration `yaml:"m3msg"` } // LocalConfiguration is the local embedded configuration if running diff --git a/src/query/server/server.go b/src/query/server/server.go index bae05ad01f..a71f7d1361 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -218,7 +218,7 @@ func Run(runOpts RunOptions) { if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } - server, err := cfg.Ingest.M3msg.NewServer(ingester.Ingest, instrumentOptions) + server, err := cfg.Ingest.M3Msg.NewServer(ingester.Ingest, instrumentOptions) if err != nil { logger.Fatal("unable to create m3msg server", zap.Error(err)) } From 43f75c05cbbfbb8fdc938ae93dbaab7a7cd8032e Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 8 Oct 2018 18:58:18 -0400 Subject: [PATCH 4/6] Adderss comments --- .../development/m3_stack/docker-compose.yml | 2 + .../services/m3coordinator/ingest/ingest.go | 10 +-- .../m3coordinator/ingest/ingest_test.go | 5 +- .../m3coordinator/server/m3msg/handler.go | 5 +- .../server/m3msg/handler_test.go | 2 + .../m3coordinator/server/m3msg/types.go | 2 + src/query/server/server.go | 8 ++- src/query/storage/types.go | 3 +- src/x/common/unit.go | 37 +++++++++++ src/x/common/unit_test.go | 62 +++++++++++++++++++ 10 files changed, 123 insertions(+), 13 deletions(-) create mode 100644 src/x/common/unit.go create mode 100644 src/x/common/unit_test.go diff --git a/scripts/development/m3_stack/docker-compose.yml b/scripts/development/m3_stack/docker-compose.yml index 3e6d531f69..3b59e541d1 100644 --- a/scripts/development/m3_stack/docker-compose.yml +++ b/scripts/development/m3_stack/docker-compose.yml @@ -37,9 +37,11 @@ services: expose: - "7201" - "7203" + - "7507" ports: - "0.0.0.0:7201:7201" - "0.0.0.0:7203:7203" + - "0.0.0.0:7507:7507" networks: - backend build: diff --git a/src/cmd/services/m3coordinator/ingest/ingest.go b/src/cmd/services/m3coordinator/ingest/ingest.go index 31b0a8e80a..95a78073ed 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/ingest.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/x/common" "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3metrics/metric/id" "github.com/m3db/m3metrics/policy" @@ -36,7 +37,6 @@ import ( "github.com/m3db/m3x/pool" "github.com/m3db/m3x/retry" xsync "github.com/m3db/m3x/sync" - xtime "github.com/m3db/m3x/time" "github.com/uber-go/tally" ) @@ -87,7 +87,6 @@ func NewIngester( it: serialize.NewMetricTagsIterator(tagDecoder, nil), p: p, m: m, - c: context.TODO(), } op.attemptFn = op.attempt op.ingestFn = op.ingest @@ -102,6 +101,7 @@ func NewIngester( // Ingest ingests a metric asynchronously with callback. func (i *Ingester) Ingest( + ctx context.Context, id []byte, metricTime time.Time, value float64, @@ -109,6 +109,7 @@ func (i *Ingester) Ingest( callback *m3msg.RefCountedCallback, ) { op := i.p.Get().(*ingestOp) + op.c = ctx op.id = id op.metricTime = metricTime op.value = value @@ -123,10 +124,10 @@ type ingestOp struct { it id.SortedTagIterator p pool.ObjectPool m ingestMetrics - c context.Context attemptFn retry.Fn ingestFn func() + c context.Context id []byte metricTime time.Time value float64 @@ -166,8 +167,7 @@ func (op *ingestOp) resetWriteQuery() error { return err } op.resetDataPoints() - op.q.Raw = string(op.id) - op.q.Unit = xtime.Millisecond + op.q.Unit = common.SanitizeUnitForM3DB(op.sp.Resolution().Precision) op.q.Attributes.MetricsType = storage.AggregatedMetricsType op.q.Attributes.Resolution = op.sp.Resolution().Window op.q.Attributes.Retention = op.sp.Retention().Duration() diff --git a/src/cmd/services/m3coordinator/ingest/ingest_test.go b/src/cmd/services/m3coordinator/ingest/ingest_test.go index ec0df67150..b37e15b93b 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/ingest_test.go @@ -65,7 +65,7 @@ func TestIngest(t *testing.T) { callback.IncRef() m.EXPECT().Ack() - ingester.Ingest(id, metricTime, val, sp, callback) + ingester.Ingest(context.TODO(), id, metricTime, val, sp, callback) for appender.cnt() != 1 { time.Sleep(100 * time.Millisecond) @@ -85,7 +85,6 @@ func TestIngest(t *testing.T) { Value: val, }, }, - Raw: string(id), Tags: models.Tags{ models.Tag{ Name: []byte("__name__"), @@ -96,7 +95,7 @@ func TestIngest(t *testing.T) { Value: []byte("bar"), }, }, - Unit: xtime.Millisecond, + Unit: xtime.Second, }, *appender.received[0], ) diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler.go b/src/cmd/services/m3coordinator/server/m3msg/handler.go index 079ecbbc84..bec49cbbe4 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/handler.go @@ -22,6 +22,7 @@ package m3msg import ( "bytes" + "context" "io" "time" @@ -33,6 +34,8 @@ import ( "github.com/uber-go/tally" ) +var ctx = context.TODO() + // Options for the ingest handler. type Options struct { InstrumentOptions instrument.Options @@ -146,7 +149,7 @@ func (h *perConsumerHandler) processMessage( // TODO: Consider incrementing a wait group for each write and wait on // shut down to reduce the number of messages being retried by m3msg. r.IncRef() - h.writeFn(m.ID, time.Unix(0, m.TimeNanos), m.Value, sp, r) + h.writeFn(ctx, m.ID, time.Unix(0, m.TimeNanos), m.Value, sp, r) } r.decRef() if err := h.it.Err(); err != nil && err != io.EOF { diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/handler_test.go index f6484dd286..8d2eb6434e 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/handler_test.go @@ -21,6 +21,7 @@ package m3msg import ( + "context" "net" "sync" "testing" @@ -100,6 +101,7 @@ type mockWriter struct { } func (m *mockWriter) write( + ctx context.Context, name []byte, metricTime time.Time, value float64, diff --git a/src/cmd/services/m3coordinator/server/m3msg/types.go b/src/cmd/services/m3coordinator/server/m3msg/types.go index 2d6cc6ca39..d0166ed8c6 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/types.go +++ b/src/cmd/services/m3coordinator/server/m3msg/types.go @@ -21,6 +21,7 @@ package m3msg import ( + "context" "sync/atomic" "time" @@ -30,6 +31,7 @@ import ( // WriteFn is the function that writes a metric. type WriteFn func( + ctx context.Context, id []byte, metricTime time.Time, value float64, diff --git a/src/query/server/server.go b/src/query/server/server.go index a71f7d1361..9fd28a92af 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -214,11 +214,15 @@ func Run(runOpts RunOptions) { }() if cfg.Ingest != nil { - ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions) + subScope := scope.SubScope("ingest") + ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions.SetMetricsScope(subScope)) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } - server, err := cfg.Ingest.M3Msg.NewServer(ingester.Ingest, instrumentOptions) + server, err := cfg.Ingest.M3Msg.NewServer( + ingester.Ingest, + instrumentOptions.SetMetricsScope(subScope), + ) if err != nil { logger.Fatal("unable to create m3msg server", zap.Error(err)) } diff --git a/src/query/storage/types.go b/src/query/storage/types.go index f1de6ca50e..38b8fad590 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -108,7 +108,6 @@ type Querier interface { // WriteQuery represents the input timeseries that is written to M3DB type WriteQuery struct { - Raw string Tags models.Tags Datapoints ts.Datapoints Unit xtime.Unit @@ -117,7 +116,7 @@ type WriteQuery struct { } func (q *WriteQuery) String() string { - return q.Raw + return q.Tags.ID() } // Appender provides batched appends against a storage. diff --git a/src/x/common/unit.go b/src/x/common/unit.go new file mode 100644 index 0000000000..0c63ec6070 --- /dev/null +++ b/src/x/common/unit.go @@ -0,0 +1,37 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package common + +import xtime "github.com/m3db/m3x/time" + +// SanitizeUnitForM3DB sanitizes a time unit into one that is suitable for m3db. +// The is done in a explicit way for better performance and the logic is +// ensured though unit test. +func SanitizeUnitForM3DB(unit xtime.Unit) xtime.Unit { + switch unit { + case xtime.Second, xtime.Minute, xtime.Hour, xtime.Day, xtime.Year: + return xtime.Second + case xtime.Nanosecond, xtime.Millisecond, xtime.Microsecond: + return unit + default: + return xtime.Nanosecond + } +} diff --git a/src/x/common/unit_test.go b/src/x/common/unit_test.go new file mode 100644 index 0000000000..b50fee3e82 --- /dev/null +++ b/src/x/common/unit_test.go @@ -0,0 +1,62 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package common + +import ( + "testing" + "time" + + xtime "github.com/m3db/m3x/time" + + "github.com/stretchr/testify/require" +) + +func TestSanitizeUnitForM3DB(t *testing.T) { + for u := byte(0); u <= byte(xtime.Year); u++ { + unit := xtime.Unit(u) + if unit == xtime.None { + require.Equal(t, xtime.Nanosecond, SanitizeUnitForM3DB(unit)) + continue + } + + dur, err := unit.Value() + require.NoError(t, err) + + switch { + case time.Duration(dur)%time.Second == 0: + require.Equal(t, xtime.Second, SanitizeUnitForM3DB(unit)) + unit = xtime.Second + case time.Duration(dur)%time.Millisecond == 0: + require.Equal(t, xtime.Millisecond, SanitizeUnitForM3DB(unit)) + case time.Duration(dur)%time.Microsecond == 0: + require.Equal(t, xtime.Microsecond, SanitizeUnitForM3DB(unit)) + default: + require.Equal(t, xtime.Nanosecond, SanitizeUnitForM3DB(unit)) + } + } +} + +func TestMakeSureAllUnitsAreChecked(t *testing.T) { + // NB: This test is to make sure whenever new unit type was added to m3x + // we can catch it and add to the sanitize function. + unit := xtime.Unit(byte(xtime.Year) + 1) + require.False(t, unit.IsValid()) +} From 9cef1e5ec06482443d840c95752b8fa9bbbd4ed0 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 8 Oct 2018 20:05:58 -0400 Subject: [PATCH 5/6] Address comments --- .../m3coordinator/downsample/flush_handler.go | 4 ++-- src/cmd/services/m3coordinator/ingest/config.go | 11 +++++------ src/cmd/services/m3coordinator/ingest/ingest.go | 4 ++-- .../services/m3coordinator/server/m3msg/handler.go | 6 +++--- src/query/server/server.go | 5 ++--- src/x/{common => convert}/unit.go | 6 +++--- src/x/{common => convert}/unit_test.go | 14 +++++++------- 7 files changed, 24 insertions(+), 26 deletions(-) rename src/x/{common => convert}/unit.go (90%) rename src/x/{common => convert}/unit_test.go (82%) diff --git a/src/cmd/services/m3coordinator/downsample/flush_handler.go b/src/cmd/services/m3coordinator/downsample/flush_handler.go index 3d48e28c9e..f3baf349f1 100644 --- a/src/cmd/services/m3coordinator/downsample/flush_handler.go +++ b/src/cmd/services/m3coordinator/downsample/flush_handler.go @@ -28,13 +28,13 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/x/convert" "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3aggregator/aggregator/handler" "github.com/m3db/m3aggregator/aggregator/handler/writer" "github.com/m3db/m3metrics/metric/aggregated" "github.com/m3db/m3x/instrument" xsync "github.com/m3db/m3x/sync" - xtime "github.com/m3db/m3x/time" "github.com/uber-go/tally" ) @@ -143,7 +143,7 @@ func (w *downsamplerFlushHandlerWriter) Write( Timestamp: time.Unix(0, mp.TimeNanos), Value: mp.Value, }}, - Unit: xtime.Millisecond, + Unit: convert.UnitForM3DB(mp.StoragePolicy.Resolution().Precision), Attributes: storage.Attributes{ MetricsType: storage.AggregatedMetricsType, Retention: mp.StoragePolicy.Retention().Duration(), diff --git a/src/cmd/services/m3coordinator/ingest/config.go b/src/cmd/services/m3coordinator/ingest/config.go index b1e6e4650f..91fee46759 100644 --- a/src/cmd/services/m3coordinator/ingest/config.go +++ b/src/cmd/services/m3coordinator/ingest/config.go @@ -45,13 +45,13 @@ func (cfg Configuration) NewIngester( if err != nil { return nil, err } - return NewIngester(*opts), nil + return NewIngester(opts), nil } func (cfg Configuration) newOptions( appender storage.Appender, instrumentOptions instrument.Options, -) (*Options, error) { +) (Options, error) { scope := instrumentOptions.MetricsScope().Tagged( map[string]string{"component": "ingester"}, ) @@ -61,7 +61,7 @@ func (cfg Configuration) newOptions( SetInstrumentOptions(instrumentOptions), ) if err != nil { - return nil, err + return Options{}, err } workers.Init() @@ -73,13 +73,12 @@ func (cfg Configuration) newOptions( SubScope("tag-decoder-pool"))), ) tagDecoderPool.Init() - opts := Options{ + return Options{ Appender: appender, Workers: workers, PoolOptions: cfg.OpPool.NewObjectPoolOptions(instrumentOptions), TagDecoderPool: tagDecoderPool, RetryOptions: cfg.Retry.NewOptions(scope), InstrumentOptions: instrumentOptions, - } - return &opts, nil + }, nil } diff --git a/src/cmd/services/m3coordinator/ingest/ingest.go b/src/cmd/services/m3coordinator/ingest/ingest.go index 95a78073ed..80c8cee185 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/ingest.go @@ -28,7 +28,7 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" - "github.com/m3db/m3/src/x/common" + "github.com/m3db/m3/src/x/convert" "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3metrics/metric/id" "github.com/m3db/m3metrics/policy" @@ -167,7 +167,7 @@ func (op *ingestOp) resetWriteQuery() error { return err } op.resetDataPoints() - op.q.Unit = common.SanitizeUnitForM3DB(op.sp.Resolution().Precision) + op.q.Unit = convert.UnitForM3DB(op.sp.Resolution().Precision) op.q.Attributes.MetricsType = storage.AggregatedMetricsType op.q.Attributes.Resolution = op.sp.Resolution().Window op.q.Attributes.Retention = op.sp.Retention().Duration() diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler.go b/src/cmd/services/m3coordinator/server/m3msg/handler.go index bec49cbbe4..0ba26e1aca 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/handler.go @@ -34,8 +34,6 @@ import ( "github.com/uber-go/tally" ) -var ctx = context.TODO() - // Options for the ingest handler. type Options struct { InstrumentOptions instrument.Options @@ -86,6 +84,7 @@ func (h *handler) Handle(c consumer.Consumer) { func (h *handler) newPerConsumerHandler() *perConsumerHandler { return &perConsumerHandler{ + ctx: context.Background(), writeFn: h.writeFn, logger: h.logger, m: h.m, @@ -96,6 +95,7 @@ func (h *handler) newPerConsumerHandler() *perConsumerHandler { type perConsumerHandler struct { // Per server variables, shared across consumers/connections. + ctx context.Context writeFn WriteFn logger log.Logger m handlerMetrics @@ -149,7 +149,7 @@ func (h *perConsumerHandler) processMessage( // TODO: Consider incrementing a wait group for each write and wait on // shut down to reduce the number of messages being retried by m3msg. r.IncRef() - h.writeFn(ctx, m.ID, time.Unix(0, m.TimeNanos), m.Value, sp, r) + h.writeFn(h.ctx, m.ID, time.Unix(0, m.TimeNanos), m.Value, sp, r) } r.decRef() if err := h.it.Err(); err != nil && err != io.EOF { diff --git a/src/query/server/server.go b/src/query/server/server.go index 9fd28a92af..670a5fcd0f 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -214,14 +214,13 @@ func Run(runOpts RunOptions) { }() if cfg.Ingest != nil { - subScope := scope.SubScope("ingest") - ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions.SetMetricsScope(subScope)) + ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } server, err := cfg.Ingest.M3Msg.NewServer( ingester.Ingest, - instrumentOptions.SetMetricsScope(subScope), + instrumentOptions.SetMetricsScope(scope.SubScope("m3msg")), ) if err != nil { logger.Fatal("unable to create m3msg server", zap.Error(err)) diff --git a/src/x/common/unit.go b/src/x/convert/unit.go similarity index 90% rename from src/x/common/unit.go rename to src/x/convert/unit.go index 0c63ec6070..2e49c52837 100644 --- a/src/x/common/unit.go +++ b/src/x/convert/unit.go @@ -18,14 +18,14 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package common +package convert import xtime "github.com/m3db/m3x/time" -// SanitizeUnitForM3DB sanitizes a time unit into one that is suitable for m3db. +// UnitForM3DB converts a time unit into one that is suitable for m3db. // The is done in a explicit way for better performance and the logic is // ensured though unit test. -func SanitizeUnitForM3DB(unit xtime.Unit) xtime.Unit { +func UnitForM3DB(unit xtime.Unit) xtime.Unit { switch unit { case xtime.Second, xtime.Minute, xtime.Hour, xtime.Day, xtime.Year: return xtime.Second diff --git a/src/x/common/unit_test.go b/src/x/convert/unit_test.go similarity index 82% rename from src/x/common/unit_test.go rename to src/x/convert/unit_test.go index b50fee3e82..9d3dcfa568 100644 --- a/src/x/common/unit_test.go +++ b/src/x/convert/unit_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package common +package convert import ( "testing" @@ -29,11 +29,11 @@ import ( "github.com/stretchr/testify/require" ) -func TestSanitizeUnitForM3DB(t *testing.T) { +func TestUnitForM3DB(t *testing.T) { for u := byte(0); u <= byte(xtime.Year); u++ { unit := xtime.Unit(u) if unit == xtime.None { - require.Equal(t, xtime.Nanosecond, SanitizeUnitForM3DB(unit)) + require.Equal(t, xtime.Nanosecond, UnitForM3DB(unit)) continue } @@ -42,14 +42,14 @@ func TestSanitizeUnitForM3DB(t *testing.T) { switch { case time.Duration(dur)%time.Second == 0: - require.Equal(t, xtime.Second, SanitizeUnitForM3DB(unit)) + require.Equal(t, xtime.Second, UnitForM3DB(unit)) unit = xtime.Second case time.Duration(dur)%time.Millisecond == 0: - require.Equal(t, xtime.Millisecond, SanitizeUnitForM3DB(unit)) + require.Equal(t, xtime.Millisecond, UnitForM3DB(unit)) case time.Duration(dur)%time.Microsecond == 0: - require.Equal(t, xtime.Microsecond, SanitizeUnitForM3DB(unit)) + require.Equal(t, xtime.Microsecond, UnitForM3DB(unit)) default: - require.Equal(t, xtime.Nanosecond, SanitizeUnitForM3DB(unit)) + require.Equal(t, xtime.Nanosecond, UnitForM3DB(unit)) } } } From 670939d6e52fcc78be193a57ec8067c7c016fefb Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 9 Oct 2018 11:36:39 -0400 Subject: [PATCH 6/6] glide up --- glide.lock | 53 ++++++++++++++++++++++++++++++----------------------- glide.yaml | 3 --- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/glide.lock b/glide.lock index 503fbb6768..b15f7e3535 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: ac83014457f1c39657823b948fa85942f444e3cfaf08717d7d4794b92a612f3a -updated: 2018-10-08T12:41:58.07315-04:00 +hash: 088081f104314ccbacafe20f26d400f6b78ad8cfc1d09b54099cf2e1485463c7 +updated: 2018-10-09T11:36:01.522981-04:00 imports: - name: github.com/apache/thrift version: c2fb1c4e8c931d22617bebb0bf388cb4d5e6fcff @@ -26,6 +26,8 @@ imports: - client - clientv3 - clientv3/concurrency + - clientv3/namespace + - clientv3/naming - compactor - discovery - embed @@ -49,6 +51,7 @@ imports: - etcdserver/etcdserverpb/gw - etcdserver/membership - etcdserver/stats + - integration - lease - lease/leasehttp - lease/leasepb @@ -73,11 +76,14 @@ imports: - pkg/runtime - pkg/schedule - pkg/srv + - pkg/testutil - pkg/tlsutil - pkg/transport - pkg/types - pkg/wait + - proxy/grpcproxy - proxy/grpcproxy/adapter + - proxy/grpcproxy/cache - raft - raft/raftpb - rafthttp @@ -88,13 +94,15 @@ imports: - wal - wal/walpb - name: github.com/coreos/go-semver - version: 568e959cd89871e61434c1143528d9162da89ef2 + version: 8ab6407b697782a06568d4b7f1db25550ec2e4c6 subpackages: - semver - name: github.com/coreos/go-systemd version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 subpackages: + - daemon - journal + - util - name: github.com/coreos/pkg version: 97fdf19511ea361ae1c100dd393cc47f8dcfa1e1 subpackages: @@ -116,7 +124,7 @@ imports: - name: github.com/fsnotify/fsnotify version: c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9 - name: github.com/ghodss/yaml - version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee + version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7 - name: github.com/glycerine/go-unsnap-stream version: 9f0cb55181dd3a0a4c168d3dbc72d4aca4853126 - name: github.com/go-kit/kit @@ -129,7 +137,7 @@ imports: - name: github.com/go-stack/stack version: 54be5f394ed2c3e19dac9134a40a95ba5a017f7b - name: github.com/gogo/protobuf - version: 100ba4e885062801d56799d78530b73b178a78f3 + version: 636bf0302bc95575d69441b25a2603156ffdddf1 subpackages: - gogoproto - jsonpb @@ -137,6 +145,10 @@ imports: - protoc-gen-gogo/descriptor - sortkeys - types +- name: github.com/golang/groupcache + version: 02826c3e79038b59d737d3b1c0a1d937f71a4433 + subpackages: + - lru - name: github.com/golang/mock version: c34cdb4725f4c3844d095133c6e40e448b86589b subpackages: @@ -209,21 +221,7 @@ imports: - name: github.com/m3db/m3aggregator version: 6f59918fe3791a3df0dc146ea492593925809c3e subpackages: - - aggregation - - aggregation/quantile/cm - - aggregator - - aggregator/handler - - aggregator/handler/common - - aggregator/handler/filter - - aggregator/handler/router - - aggregator/handler/router/trafficcontrol - - aggregator/handler/writer - - bitset - client - - generated/proto/flush - - hash - - rate - - runtime - sharding - name: github.com/m3db/m3cluster version: b3db8dcb85c34fbd02a7721f6c0639d6e96c6fbe @@ -322,6 +320,7 @@ imports: - log - net - pool + - pprof - process - resource - retry @@ -370,8 +369,6 @@ imports: version: 90eadee771aeab36e8bf796039b8c261bebebe4f - name: github.com/nightlyone/lockfile version: 1d49c987357a327b5b03aa84cbddd582c328615d -- name: github.com/nu7hatch/gouuid - version: 179d4d0c4d8d407a32af483c2354df1d2c91e6c3 - name: github.com/oklog/ulid version: 66bb6560562feca7045b23db1ae85b01260f87c5 - name: github.com/opentracing/opentracing-go @@ -393,7 +390,7 @@ imports: subpackages: - roaring - name: github.com/pkg/errors - version: 248dadf4e9068a0b3e79f02ed0a610d935de5302 + version: 645ef00459ed84a119197bfb8d8205042c6df63d - name: github.com/pkg/profile version: 5b67d428864e92711fcbd2f8629456121a56d91f - name: github.com/pmezard/go-difflib @@ -519,6 +516,8 @@ imports: - zapcore - name: golang.org/x/crypto version: 1351f936d976c60a0a48d728281922cf63eafb8d + repo: https://github.com/golang/crypto + vcs: git subpackages: - bcrypt - blowfish @@ -545,6 +544,8 @@ imports: - errgroup - name: golang.org/x/sys version: d4feaf1a7e61e1d9e79e6c4e76c6349e9cab0a03 + repo: https://github.com/golang/sys + vcs: git subpackages: - unix - name: golang.org/x/text @@ -554,6 +555,12 @@ imports: - transform - unicode/bidi - unicode/norm +- name: golang.org/x/time + version: a4bde12657593d5e90d0533a3e4fd95e635124cb + repo: https://github.com/golang/time + vcs: git + subpackages: + - rate - name: google.golang.org/appengine version: 2e4a801b39fc199db615bfca7d0b9f8cd9580599 subpackages: @@ -571,7 +578,7 @@ imports: - googleapis/api/annotations - googleapis/rpc/status - name: google.golang.org/grpc - version: 5b3c4e850e90a4cf6a20ebd46c8b32a0a3afcb9e + version: 401e0e00e4bb830a10496d64cd95e068c5bf50de subpackages: - balancer - codes diff --git a/glide.yaml b/glide.yaml index adc1dd8ab4..bcd4153984 100644 --- a/glide.yaml +++ b/glide.yaml @@ -22,9 +22,6 @@ import: - services - integration/etcd - - package: github.com/m3db/m3msg - version: 4680d9b45286826f87b134a4559b11d795786eaf - - package: github.com/m3db/m3metrics version: 5c6aed344a0c209277d36334f0a824d9f3bb835b