Skip to content

Commit

Permalink
[dbnode] Add namespace runtime options for runtime per-namespace conf…
Browse files Browse the repository at this point in the history
…ig changes (#2446)
  • Loading branch information
robskillington authored Jul 22, 2020
1 parent 255bef5 commit ecf8d80
Show file tree
Hide file tree
Showing 82 changed files with 3,166 additions and 928 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ install-tools:
GOBIN=$(tools_bin_path) go install github.com/mauricelam/genny
GOBIN=$(tools_bin_path) go install github.com/mjibson/esc
GOBIN=$(tools_bin_path) go install github.com/pointlander/peg
GOBIN=$(tools_bin_path) go install github.com/prateek/gorename
GOBIN=$(tools_bin_path) go install github.com/robskillington/gorename
GOBIN=$(tools_bin_path) go install github.com/rakyll/statik
GOBIN=$(tools_bin_path) go install github.com/garethr/kubeval

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ require (
github.com/pointlander/jetset v1.0.0 // indirect
github.com/pointlander/peg v1.0.0
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect
github.com/prateek/gorename v0.0.0-20180424020013-52c7307cddd2
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/common v0.9.1
github.com/prometheus/prometheus v1.8.2-0.20200420081721-18254838fbe2
github.com/rakyll/statik v0.1.6
github.com/remeh/sizedwaitgroup v1.0.0 // indirect
github.com/rhysd/go-github-selfupdate v1.2.2 // indirect
github.com/robskillington/gorename v0.0.0-20180424020013-52c7307cddd2
github.com/russross/blackfriday v2.0.0+incompatible
github.com/rveen/ogdl v0.0.0-20200522080342-eeeda1a978e7 // indirect
github.com/satori/go.uuid v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,6 @@ github.com/pointlander/peg v1.0.0/go.mod h1:WJTMcgeWYr6fZz4CwHnY1oWZCXew8GWCF93F
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a h1:AA9vgIBDjMHPC2McaGPojgV2dcI78ZC0TLNhYCXEKH8=
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a/go.mod h1:lzZQ3Noex5pfAy7mkAeCjcBDteYU85uWWnJ/y6gKU8k=
github.com/prateek/gorename v0.0.0-20180424020013-52c7307cddd2 h1:/pJs9wFXnmhD12W+dnwoNJXPtLsxa78y+vzC9i/Hs+A=
github.com/prateek/gorename v0.0.0-20180424020013-52c7307cddd2/go.mod h1:nw9dXugFBQe1pshgrdeRjyYrY+RxNZckdWkiGHX8URE=
github.com/prometheus/alertmanager v0.20.0/go.mod h1:9g2i48FAyZW6BtbsnvHtMHQXl2aVtrORKwKVCQ+nbrg=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
Expand Down Expand Up @@ -641,6 +639,8 @@ github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/rhysd/go-github-selfupdate v1.2.2 h1:G+mNzkc1wEtpmM6sFS/Ghkeq+ad4Yp6EZEHyp//wGEo=
github.com/rhysd/go-github-selfupdate v1.2.2/go.mod h1:khesvSyKcXDUxeySCedFh621iawCks0dS/QnHPcpCws=
github.com/robskillington/gorename v0.0.0-20180424020013-52c7307cddd2 h1:t+C9QFlvAI+evRn96lz7eKyzo1CgDx3YVx3N/GJIetk=
github.com/robskillington/gorename v0.0.0-20180424020013-52c7307cddd2/go.mod h1:CVTJ4xwzb/4H98jrd7NFgNoTAiL63scr2Pl7kqOcQAQ=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
12 changes: 6 additions & 6 deletions src/cmd/services/m3coordinator/downsample/downsample_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 27 additions & 22 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package downsample
import (
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

Expand All @@ -35,9 +34,14 @@ type Downsampler interface {
// MetricsAppender is a metrics appender that can build a samples
// appender, only valid to use with a single caller at a time.
type MetricsAppender interface {
// NextMetric progresses to building the next metric.
NextMetric()
// AddTag adds a tag to the current metric being built.
AddTag(name, value []byte)
// SamplesAppender returns a samples appender for the current
// metric built with the tags that have been set.
SamplesAppender(opts SampleAppenderOptions) (SamplesAppenderResult, error)
Reset()
// Finalize finalizes the entire metrics appender for reuse.
Finalize()
}

Expand Down Expand Up @@ -72,11 +76,9 @@ type SamplesAppender interface {
}

type downsampler struct {
opts DownsamplerOptions
agg agg

debugLogging bool
logger *zap.Logger
opts DownsamplerOptions
agg agg
metricsAppenderOpts metricsAppenderOptions
}

type downsamplerOptions struct {
Expand All @@ -95,24 +97,27 @@ func newDownsampler(opts downsamplerOptions) (*downsampler, error) {
debugLogging = true
}

metricsAppenderOpts := metricsAppenderOptions{
agg: opts.agg.aggregator,
clientRemote: opts.agg.clientRemote,
defaultStagedMetadatasProtos: opts.agg.defaultStagedMetadatasProtos,
clockOpts: opts.agg.clockOpts,
tagEncoderPool: opts.agg.pools.tagEncoderPool,
matcher: opts.agg.matcher,
metricTagsIteratorPool: opts.agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
}

return &downsampler{
opts: opts.opts,
agg: opts.agg,
debugLogging: debugLogging,
logger: logger,
opts: opts.opts,
agg: opts.agg,
metricsAppenderOpts: metricsAppenderOpts,
}, nil
}

func (d *downsampler) NewMetricsAppender() (MetricsAppender, error) {
return newMetricsAppender(metricsAppenderOptions{
agg: d.agg.aggregator,
clientRemote: d.agg.clientRemote,
defaultStagedMetadatasProtos: d.agg.defaultStagedMetadatasProtos,
clockOpts: d.agg.clockOpts,
tagEncoder: d.agg.pools.tagEncoderPool.Get(),
matcher: d.agg.matcher,
metricTagsIteratorPool: d.agg.pools.metricTagsIteratorPool,
debugLogging: d.debugLogging,
logger: d.logger,
}), nil
metricsAppender := d.agg.pools.metricsAppenderPool.Get()
metricsAppender.reset(d.metricsAppenderOpts)
return metricsAppender, nil
}
34 changes: 22 additions & 12 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,8 @@ func testDownsamplerAggregationIngest(
opts = *testOpts.sampleAppenderOpts
}
for _, metric := range testCounterMetrics {
appender.Reset()
appender.NextMetric()

for name, value := range metric.tags {
appender.AddTag([]byte(name), []byte(value))
}
Expand All @@ -1016,7 +1017,8 @@ func testDownsamplerAggregationIngest(
}
}
for _, metric := range testGaugeMetrics {
appender.Reset()
appender.NextMetric()

for name, value := range metric.tags {
appender.AddTag([]byte(name), []byte(value))
}
Expand Down Expand Up @@ -1116,13 +1118,20 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
tagEncoderOptions := serialize.NewTagEncoderOptions()
tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{})
tagEncoderPoolOptions := pool.NewObjectPoolOptions().
SetSize(2).
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-encoder-pool")))
tagDecoderPoolOptions := pool.NewObjectPoolOptions().
SetSize(2).
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-decoder-pool")))
metricsAppenderPoolOptions := pool.NewObjectPoolOptions().
SetSize(2).
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("metrics-appender-pool")))

var cfg Configuration
if opts.remoteClientMock != nil {
Expand All @@ -1137,16 +1146,17 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
}

instance, err := cfg.NewDownsampler(DownsamplerOptions{
Storage: storage,
ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)),
RulesKVStore: rulesKVStore,
AutoMappingRules: opts.autoMappingRules,
ClockOptions: clockOpts,
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
TagDecoderOptions: tagDecoderOptions,
TagEncoderPoolOptions: tagEncoderPoolOptions,
TagDecoderPoolOptions: tagDecoderPoolOptions,
Storage: storage,
ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)),
RulesKVStore: rulesKVStore,
AutoMappingRules: opts.autoMappingRules,
ClockOptions: clockOpts,
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
TagDecoderOptions: tagDecoderOptions,
TagEncoderPoolOptions: tagEncoderPoolOptions,
TagDecoderPoolOptions: tagDecoderPoolOptions,
MetricsAppenderPoolOptions: metricsAppenderPoolOptions,
})
require.NoError(t, err)

Expand Down
65 changes: 54 additions & 11 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,47 @@ import (
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"

"github.com/golang/protobuf/jsonpb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type metricsAppenderPool struct {
pool pool.ObjectPool
}

func newMetricsAppenderPool(opts pool.ObjectPoolOptions) *metricsAppenderPool {
p := &metricsAppenderPool{
pool: pool.NewObjectPool(opts),
}
p.pool.Init(func() interface{} {
return newMetricsAppender(p)
})
return p
}

func (p *metricsAppenderPool) Get() *metricsAppender {
return p.pool.Get().(*metricsAppender)
}

func (p *metricsAppenderPool) Put(v *metricsAppender) {
p.pool.Put(v)
}

type metricsAppender struct {
metricsAppenderOptions

pool *metricsAppenderPool

tags *tags
multiSamplesAppender *multiSamplesAppender
curr metadata.StagedMetadata
defaultStagedMetadatasCopies []metadata.StagedMetadatas
mappingRuleStoragePolicies []policy.StoragePolicy
tagEncoder serialize.TagEncoder
}

// metricsAppenderOptions will have one of agg or clientRemote set.
Expand All @@ -57,23 +83,40 @@ type metricsAppenderOptions struct {
clientRemote client.Client

defaultStagedMetadatasProtos []metricpb.StagedMetadatas
tagEncoder serialize.TagEncoder
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool

clockOpts clock.Options
debugLogging bool
logger *zap.Logger
}

func newMetricsAppender(opts metricsAppenderOptions) *metricsAppender {
stagedMetadatasCopies := make([]metadata.StagedMetadatas,
len(opts.defaultStagedMetadatasProtos))
func newMetricsAppender(pool *metricsAppenderPool) *metricsAppender {
return &metricsAppender{
metricsAppenderOptions: opts,
tags: newTags(),
multiSamplesAppender: newMultiSamplesAppender(),
defaultStagedMetadatasCopies: stagedMetadatasCopies,
pool: pool,
tags: newTags(),
multiSamplesAppender: newMultiSamplesAppender(),
}
}

// reset is called when pulled from the pool.
func (a *metricsAppender) reset(opts metricsAppenderOptions) {
a.metricsAppenderOptions = opts
if a.tagEncoder == nil {
a.tagEncoder = opts.tagEncoderPool.Get()
}

// Make sure a.defaultStagedMetadatasCopies is right length.
capRequired := len(opts.defaultStagedMetadatasProtos)
if cap(a.defaultStagedMetadatasCopies) < capRequired {
// Too short, reallocate.
slice := make([]metadata.StagedMetadatas, capRequired)
a.defaultStagedMetadatasCopies = slice
} else {
// Has enough capacity, take subslice.
slice := a.defaultStagedMetadatasCopies[:capRequired]
a.defaultStagedMetadatasCopies = slice
}
}

Expand Down Expand Up @@ -317,14 +360,14 @@ func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) {
a.logger.Debug(str, fields...)
}

func (a *metricsAppender) Reset() {
func (a *metricsAppender) NextMetric() {
a.tags.names = a.tags.names[:0]
a.tags.values = a.tags.values[:0]
}

func (a *metricsAppender) Finalize() {
a.tagEncoder.Finalize()
a.tagEncoder = nil
// Return to pool.
a.pool.Put(a)
}

func stagedMetadatasLogField(sm metadata.StagedMetadatas) zapcore.Field {
Expand Down
Loading

0 comments on commit ecf8d80

Please sign in to comment.