From 0225ecb9e779a899df1de0cf8c1930de21113052 Mon Sep 17 00:00:00 2001 From: Siyu Date: Fri, 1 Oct 2021 15:53:03 -0400 Subject: [PATCH 1/6] [tests] Add In-Process Aggregator (Start Only) (#3795) This commit adds resources.Aggregator interface with a few basic functions. It also adds an in-process implementation of the new interface which will start in process. --- .../resources/inprocess/aggregator.go | 180 ++++++++++++ .../resources/inprocess/aggregator_test.go | 276 ++++++++++++++++++ .../resources/inprocess/coordinator_test.go | 2 - src/integration/resources/types.go | 17 ++ 4 files changed, 473 insertions(+), 2 deletions(-) create mode 100644 src/integration/resources/inprocess/aggregator.go create mode 100644 src/integration/resources/inprocess/aggregator_test.go diff --git a/src/integration/resources/inprocess/aggregator.go b/src/integration/resources/inprocess/aggregator.go new file mode 100644 index 0000000000..55fea700e7 --- /dev/null +++ b/src/integration/resources/inprocess/aggregator.go @@ -0,0 +1,180 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package inprocess + +import ( + "errors" + "io/ioutil" + "os" + "time" + + "go.uber.org/zap" + "gopkg.in/yaml.v2" + + m3agg "github.com/m3db/m3/src/aggregator/aggregator" + "github.com/m3db/m3/src/aggregator/server" + "github.com/m3db/m3/src/cmd/services/m3aggregator/config" + "github.com/m3db/m3/src/integration/resources" + nettest "github.com/m3db/m3/src/integration/resources/net" + xos "github.com/m3db/m3/src/x/os" +) + +type aggregator struct { + cfg config.Configuration + logger *zap.Logger + tmpDirs []string + + interruptCh chan<- error + shutdownCh <-chan struct{} +} + +// AggregatorOptions are options of starting an in-process aggregator. +type AggregatorOptions struct { + // Logger is the logger to use for the in-process aggregator. + Logger *zap.Logger +} + +// NewAggregator creates a new in-process aggregator based on the configuration +// and options provided. +func NewAggregator(yamlCfg string, opts AggregatorOptions) (resources.Aggregator, error) { + var cfg config.Configuration + if err := yaml.Unmarshal([]byte(yamlCfg), &cfg); err != nil { + return nil, err + } + + // Replace any "0" ports with an open port + cfg, err := updateAggregatorPorts(cfg) + if err != nil { + return nil, err + } + + // Replace any "*" filepath with a temporary directory + cfg, tmpDirs, err := updateAggregatorFilepaths(cfg) + if err != nil { + return nil, err + } + + if opts.Logger == nil { + var err error + opts.Logger, err = zap.NewDevelopment() + if err != nil { + return nil, err + } + } + + agg := &aggregator{ + cfg: cfg, + logger: opts.Logger, + tmpDirs: tmpDirs, + } + agg.start() + + return agg, nil +} + +func (a *aggregator) IsHealthy(instance string) error { + return nil +} + +func (a *aggregator) Status(instance string) (m3agg.RuntimeStatus, error) { + return m3agg.RuntimeStatus{}, nil +} + +func (a *aggregator) Resign(instance string) error { + return nil +} + +func (a *aggregator) Close() error { + defer func() { + for _, dir := range a.tmpDirs { + if err := os.RemoveAll(dir); err != nil { + a.logger.Error("error removing temp directory", zap.String("dir", dir), zap.Error(err)) + } + } + }() + + select { + case a.interruptCh <- xos.NewInterruptError("in-process aggregator being shut down"): + case <-time.After(interruptTimeout): + return errors.New("timeout sending interrupt. closing without graceful shutdown") + } + + select { + case <-a.shutdownCh: + case <-time.After(shutdownTimeout): + return errors.New("timeout waiting for shutdown notification. server closing may" + + " not be completely graceful") + } + + return nil +} + +func (a *aggregator) start() { + interruptCh := make(chan error, 1) + shutdownCh := make(chan struct{}, 1) + + go func() { + server.Run(server.RunOptions{ + Config: a.cfg, + InterruptCh: interruptCh, + ShutdownCh: shutdownCh, + }) + }() + + a.interruptCh = interruptCh + a.shutdownCh = shutdownCh +} + +func updateAggregatorPorts(cfg config.Configuration) (config.Configuration, error) { + if cfg.HTTP != nil && len(cfg.HTTP.ListenAddress) > 0 { + addr, _, _, err := nettest.MaybeGeneratePort(cfg.HTTP.ListenAddress) + if err != nil { + return cfg, err + } + + cfg.HTTP.ListenAddress = addr + } + + if cfg.M3Msg != nil && len(cfg.M3Msg.Server.ListenAddress) > 0 { + addr, _, _, err := nettest.MaybeGeneratePort(cfg.M3Msg.Server.ListenAddress) + if err != nil { + return cfg, err + } + + cfg.M3Msg.Server.ListenAddress = addr + } + + return cfg, nil +} + +func updateAggregatorFilepaths(cfg config.Configuration) (config.Configuration, []string, error) { + tmpDirs := make([]string, 0, 1) + if cfg.KVClient.Etcd != nil && cfg.KVClient.Etcd.CacheDir == "*" { + dir, err := ioutil.TempDir("", "m3agg-*") + if err != nil { + return cfg, tmpDirs, err + } + tmpDirs = append(tmpDirs, dir) + cfg.KVClient.Etcd.CacheDir = dir + } + + return cfg, tmpDirs, nil +} diff --git a/src/integration/resources/inprocess/aggregator_test.go b/src/integration/resources/inprocess/aggregator_test.go new file mode 100644 index 0000000000..e5ac48d2d8 --- /dev/null +++ b/src/integration/resources/inprocess/aggregator_test.go @@ -0,0 +1,276 @@ +// +build integration_v2 +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package inprocess + +import ( + "testing" + + "github.com/m3db/m3/src/cluster/generated/proto/placementpb" + "github.com/m3db/m3/src/integration/resources" + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + "github.com/m3db/m3/src/query/generated/proto/admin" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewAggregator(t *testing.T) { + dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{}) + require.NoError(t, err) + + coord, err := NewCoordinatorFromYAML(aggregatorCoordConfig, CoordinatorOptions{}) + require.NoError(t, err) + + defer func() { + assert.NoError(t, coord.Close()) + assert.NoError(t, dbnode.Close()) + }() + + require.NoError(t, coord.WaitForNamespace("")) + + setupPlacement(t, coord) + setupM3msgTopic(t, coord) + + agg, err := NewAggregator(defaultAggregatorConfig, AggregatorOptions{}) + require.NoError(t, err) + require.NoError(t, agg.Close()) + + // restart an aggregator instance + agg, err = NewAggregator(defaultAggregatorConfig, AggregatorOptions{}) + require.NoError(t, err) + require.NoError(t, agg.Close()) +} + +func setupM3msgTopic(t *testing.T, coord resources.Coordinator) { + m3msgTopicOpts := resources.M3msgTopicOptions{ + Zone: "embedded", + Env: "default_env", + TopicName: "aggregator_ingest", + } + + _, err := coord.InitM3msgTopic(m3msgTopicOpts, admin.TopicInitRequest{NumberOfShards: 16}) + require.NoError(t, err) + + _, err = coord.AddM3msgTopicConsumer(m3msgTopicOpts, admin.TopicAddRequest{ + ConsumerService: &topicpb.ConsumerService{ + ServiceId: &topicpb.ServiceID{ + Name: "m3aggregator", + Environment: m3msgTopicOpts.Env, + Zone: m3msgTopicOpts.Zone, + }, + ConsumptionType: topicpb.ConsumptionType_REPLICATED, + MessageTtlNanos: 600000000000, // 10 mins + }, + }) + require.NoError(t, err) + + aggregatedTopicOpts := resources.M3msgTopicOptions{ + Zone: "embedded", + Env: "default_env", + TopicName: "aggregated_metrics", + } + _, err = coord.InitM3msgTopic(aggregatedTopicOpts, admin.TopicInitRequest{NumberOfShards: 16}) + require.NoError(t, err) + + _, err = coord.AddM3msgTopicConsumer(aggregatedTopicOpts, admin.TopicAddRequest{ + ConsumerService: &topicpb.ConsumerService{ + ServiceId: &topicpb.ServiceID{ + Name: "m3coordinator", + Environment: aggregatedTopicOpts.Env, + Zone: aggregatedTopicOpts.Zone, + }, + ConsumptionType: topicpb.ConsumptionType_SHARED, + MessageTtlNanos: 600000000000, // 10 mins + }, + }) + require.NoError(t, err) +} + +func setupPlacement(t *testing.T, coord resources.Coordinator) { + _, err := coord.InitPlacement( + resources.PlacementRequestOptions{ + Service: resources.ServiceTypeM3Aggregator, + Zone: "embedded", + Env: "default_env", + }, + admin.PlacementInitRequest{ + NumShards: 1, + ReplicationFactor: 1, + Instances: []*placementpb.Instance{ + { + Id: "m3aggregator01", + IsolationGroup: "rack1", + Zone: "embedded", + Weight: 1, + Endpoint: "m3aggregator01:6000", + Hostname: "m3aggregator01", + Port: 6000, + }, + }, + }, + ) + require.NoError(t, err) + + _, err = coord.InitPlacement( + resources.PlacementRequestOptions{ + Service: resources.ServiceTypeM3Coordinator, + Zone: "embedded", + Env: "default_env", + }, + admin.PlacementInitRequest{ + Instances: []*placementpb.Instance{ + { + Id: "m3coordinator01", + Zone: "embedded", + Endpoint: "m3coordinator01:7507", + Hostname: "m3coordinator01", + Port: 7507, + }, + }, + }, + ) + require.NoError(t, err) +} + +const defaultAggregatorConfig = ` +metrics: + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:6002 + timerType: histogram + sanitization: prometheus + samplingRate: 1.0 +http: + listenAddress: 0.0.0.0:6001 + readTimeout: 60s + writeTimeout: 60s +m3msg: + server: + listenAddress: 0.0.0.0:6000 + retry: + maxBackoff: 10s + jitter: true + consumer: + messagePool: + size: 16384 + watermark: + low: 0.2 + high: 0.5 +kvClient: + etcd: + env: default_env + zone: embedded + service: m3aggregator + cacheDir: "*" + etcdClusters: + - zone: embedded + endpoints: + - 127.0.0.1:2379 +aggregator: + instanceID: + type: host_id + stream: + eps: 0.001 + capacity: 32 + client: + type: m3msg + m3msg: + producer: + writer: + topicName: aggregator_ingest + topicServiceOverride: + zone: embedded + environment: default_env + placement: + isStaged: true + placementServiceOverride: + namespaces: + placement: /placement + messagePool: + size: 16384 + watermark: + low: 0.2 + high: 0.5 + ignoreCutoffCutover: true + placementManager: + kvConfig: + namespace: /placement + environment: default_env + zone: embedded + placementWatcher: + key: m3aggregator + electionManager: + serviceID: + name: m3aggregator + environment: default_env + zone: embedded + flush: + handlers: + - dynamicBackend: + name: m3msg + hashType: murmur32 + producer: + writer: + topicName: aggregated_metrics + topicServiceOverride: + zone: embedded + environment: default_env + messagePool: + size: 16384 + watermark: + low: 0.2 + high: 0.5 +` + +const aggregatorCoordConfig = ` +clusters: + - namespaces: + - namespace: default + type: unaggregated + retention: 1h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: "*" + etcdClusters: + - zone: embedded + endpoints: + - 127.0.0.1:2379 +ingest: + ingester: + workerPoolSize: 10000 + opPool: + size: 10000 + retry: + maxRetries: 3 + jitter: true + logSampleRate: 0.01 + m3msg: + server: + listenAddress: "0.0.0.0:7507" + retry: + maxBackoff: 10s + jitter: true +` diff --git a/src/integration/resources/inprocess/coordinator_test.go b/src/integration/resources/inprocess/coordinator_test.go index 41c67854ee..9cb63520df 100644 --- a/src/integration/resources/inprocess/coordinator_test.go +++ b/src/integration/resources/inprocess/coordinator_test.go @@ -1,6 +1,4 @@ -//go:build integration_v2 // +build integration_v2 - // Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy diff --git a/src/integration/resources/types.go b/src/integration/resources/types.go index 243b95e8a2..c7e5d63421 100644 --- a/src/integration/resources/types.go +++ b/src/integration/resources/types.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/m3db/m3/src/aggregator/aggregator" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/generated/proto/prompb" @@ -125,6 +126,22 @@ type Node interface { Close() error } +// Aggregator is an aggregator instance. +type Aggregator interface { + // IsHealthy determines whether an instance is healthy. + IsHealthy(instance string) error + + // Status returns the instance status. + Status(instance string) (aggregator.RuntimeStatus, error) + + // Resign asks an aggregator instance to give up its current leader role if applicable. + Resign(instance string) error + + // Close closes the wrapper and releases any held resources, including + // deleting docker containers. + Close() error +} + // M3Resources represents a set of test M3 components. type M3Resources interface { // Cleanup cleans up after each started component. From ddeddc4e4288d51d58f2f6460b42852b0dc7e8fc Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 1 Oct 2021 13:19:53 -0700 Subject: [PATCH 2/6] Optionally disable m3msg consumer metric scope (#3802) For large m3msg deployments the number of consumers can add a lot of cardinality to the metrics. Now users can disable the "consumer" label via the config option. --- src/msg/producer/config/writer.go | 6 +++++- .../writer/consumer_service_writer.go | 1 + src/msg/producer/writer/message_writer.go | 17 ++++++++++++---- .../producer/writer/message_writer_test.go | 20 +++++++++++++++++-- src/msg/producer/writer/options.go | 18 +++++++++++++++++ 5 files changed, 55 insertions(+), 7 deletions(-) diff --git a/src/msg/producer/config/writer.go b/src/msg/producer/config/writer.go index da896dc1de..36c9ef78f4 100644 --- a/src/msg/producer/config/writer.go +++ b/src/msg/producer/config/writer.go @@ -107,6 +107,9 @@ type WriterConfiguration struct { // IgnoreCutoffCutover allows producing writes ignoring cutoff/cutover timestamp. // Must be in sync with AggregatorConfiguration.WritesIgnoreCutoffCutover. IgnoreCutoffCutover bool `yaml:"ignoreCutoffCutover"` + // WithoutConsumerScope drops the consumer tag from the metrics. For large m3msg deployments the consumer tag can + // add a lot of cardinality to the metrics. + WithoutConsumerScope bool `yaml:"withoutConsumerScope"` } // NewOptions creates writer options. @@ -118,7 +121,8 @@ func (c *WriterConfiguration) NewOptions( opts := writer.NewOptions(). SetTopicName(c.TopicName). SetPlacementOptions(c.PlacementOptions.NewOptions()). - SetInstrumentOptions(iOpts) + SetInstrumentOptions(iOpts). + SetWithoutConsumerScope(c.WithoutConsumerScope) kvOpts, err := c.TopicServiceOverride.NewOverrideOptions() if err != nil { diff --git a/src/msg/producer/writer/consumer_service_writer.go b/src/msg/producer/writer/consumer_service_writer.go index 2e4bd96f6b..5e098c225b 100644 --- a/src/msg/producer/writer/consumer_service_writer.go +++ b/src/msg/producer/writer/consumer_service_writer.go @@ -163,6 +163,7 @@ func initShardWriters( m = newMessageWriterMetrics( opts.InstrumentOptions().MetricsScope(), opts.InstrumentOptions().TimerOptions(), + opts.WithoutConsumerScope(), ) mPool messagePool ) diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index bb3fdfafa0..f0ec1cdc3a 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -99,6 +99,7 @@ type messageWriter interface { } type messageWriterMetrics struct { + withoutConsumerScope bool scope tally.Scope opts instrument.TimerOptions writeSuccess tally.Counter @@ -127,23 +128,31 @@ type messageWriterMetrics struct { } func (m messageWriterMetrics) withConsumer(consumer string) messageWriterMetrics { - return newMessageWriterMetricsWithConsumer(m.scope, m.opts, consumer) + if m.withoutConsumerScope { + return m + } + return newMessageWriterMetricsWithConsumer(m.scope, m.opts, consumer, false) } func newMessageWriterMetrics( scope tally.Scope, opts instrument.TimerOptions, + withoutConsumerScope bool, ) messageWriterMetrics { - return newMessageWriterMetricsWithConsumer(scope, opts, "unknown") + return newMessageWriterMetricsWithConsumer(scope, opts, "unknown", withoutConsumerScope) } func newMessageWriterMetricsWithConsumer( scope tally.Scope, opts instrument.TimerOptions, consumer string, -) messageWriterMetrics { - consumerScope := scope.Tagged(map[string]string{"consumer": consumer}) + withoutConsumerScope bool) messageWriterMetrics { + consumerScope := scope + if !withoutConsumerScope { + consumerScope = scope.Tagged(map[string]string{"consumer": consumer}) + } return messageWriterMetrics{ + withoutConsumerScope: withoutConsumerScope, scope: scope, opts: opts, writeSuccess: consumerScope.Counter("write-success"), diff --git a/src/msg/producer/writer/message_writer_test.go b/src/msg/producer/writer/message_writer_test.go index 3970b84b44..f57bd7f52e 100644 --- a/src/msg/producer/writer/message_writer_test.go +++ b/src/msg/producer/writer/message_writer_test.go @@ -855,6 +855,22 @@ func TestMessageWriterQueueFullScanOnWriteErrors(t *testing.T) { require.Equal(t, int64(1), counters["message-processed+consumer=c1,result=drop"].Value()) } +func TestMessageWriter_WithoutConsumerScope(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + opts := testOptions().SetMessageQueueScanBatchSize(1) + scope := tally.NewTestScope("", nil) + metrics := newMessageWriterMetrics(scope, instrument.TimerOptions{}, true) + w := newMessageWriter(200, nil, opts, metrics).(*messageWriterImpl) + w.AddConsumerWriter(newConsumerWriter("bad", nil, opts, testConsumerWriterMetrics())) + + snapshot := scope.Snapshot() + counters := snapshot.Counters() + require.Nil(t, counters["message-processed+consumer=c1,result=write"]) + require.NotNil(t, counters["message-processed+result=write"]) +} + func isEmptyWithLock(h *acks) bool { h.Lock() defer h.Unlock() @@ -868,11 +884,11 @@ func testMessagePool(opts Options) messagePool { } func testMessageWriterMetrics() messageWriterMetrics { - return newMessageWriterMetrics(tally.NoopScope, instrument.TimerOptions{}) + return newMessageWriterMetrics(tally.NoopScope, instrument.TimerOptions{}, false) } func testMessageWriterMetricsWithScope(scope tally.TestScope) messageWriterMetrics { - return newMessageWriterMetrics(scope, instrument.TimerOptions{}) + return newMessageWriterMetrics(scope, instrument.TimerOptions{}, false) } func validateMessages(t *testing.T, msgs []*producer.RefCountedMessage, w *messageWriterImpl) { diff --git a/src/msg/producer/writer/options.go b/src/msg/producer/writer/options.go index ac4351758c..a6f696d5bc 100644 --- a/src/msg/producer/writer/options.go +++ b/src/msg/producer/writer/options.go @@ -363,6 +363,13 @@ type Options interface { // SetIgnoreCutoffCutover sets a flag controlling whether cutoff/cutover timestamps are ignored. SetIgnoreCutoffCutover(value bool) Options + + // WithoutConsumerScope disables the consumer scope for metrics. For large m3msg deployments the consumer + // scope can add a lot of cardinality to the metrics. + WithoutConsumerScope() bool + + // SetWithoutConsumerScope sets the value for WithoutConsumerScope. + SetWithoutConsumerScope(value bool) Options } type writerOptions struct { @@ -385,6 +392,7 @@ type writerOptions struct { cOpts ConnectionOptions iOpts instrument.Options ignoreCutoffCutover bool + withoutConsumerScope bool } // NewOptions creates Options. @@ -596,3 +604,13 @@ func (opts *writerOptions) SetIgnoreCutoffCutover(value bool) Options { o.ignoreCutoffCutover = value return &o } + +func (opts *writerOptions) WithoutConsumerScope() bool { + return opts.withoutConsumerScope +} + +func (opts *writerOptions) SetWithoutConsumerScope(value bool) Options { + o := *opts + o.withoutConsumerScope = value + return &o +} From 5b8f31629680a13f06a77d79951f58245fd03bf6 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 1 Oct 2021 18:14:18 -0400 Subject: [PATCH 3/6] [downsampler] Flesh comments in metrics_appender.go (#3803) --- .../downsample/metrics_appender.go | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 44913578a6..c063678113 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -221,8 +221,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // Reuse a slice to keep the current staged metadatas we will apply. a.curr.Pipelines = a.curr.Pipelines[:0] + // First, process any override explicitly provided as part of request + // (via request headers that specify target namespaces). if opts.Override { - // Process an override explicitly provided as part of request. for _, rule := range opts.OverrideRules.MappingRules { stagedMetadatas, err := rule.StagedMetadatas() if err != nil { @@ -248,12 +249,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp }, nil } - // NB(r): First apply mapping rules to see which storage policies - // have been applied, any that have been applied as part of - // mapping rules that exact match a default storage policy will be - // skipped when applying default rules, so as to avoid storing - // the same metrics in the same namespace with the same metric - // name and tags (i.e. overwriting each other). + // Next, apply any mapping rules that match. We track which storage policies have been applied based on the + // mapping rules that match. Any storage policies that have been applied will be skipped when applying + // the auto-mapping rules to avoid redundant writes (i.e. overwriting each other). var ( ruleStagedMetadatas = matchResult.ForExistingIDAt(nowNanos) dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult @@ -293,13 +291,12 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...) } - // Always aggregate any default staged metadatas with a few exceptions. - // Exceptions are: + // Next, we cover auto-mapping (otherwise referred to as default) rules. + // We always aggregate any default rules with a few exceptions: // 1. A mapping rule has provided an override for a storage policy, - // if so then skip aggregating for that storage policy). - // 2. Any type of drop rule has been set, since they should only - // impact mapping rules, not default staged metadatas provided from - // auto-mapping rules (i.e. default namespace aggregation). + // if so then skip aggregating for that storage policy. + // This is what we calculated in the step above. + // 2. Any type of drop rule has been set. Drop rules should mean that the auto-mapping rules are ignored. if !a.curr.Pipelines.IsDropPolicySet() { // No drop rule has been set as part of rule matching. for idx, stagedMetadatasProto := range a.defaultStagedMetadatasProtos { @@ -394,7 +391,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // Apply drop policies results a.curr.Pipelines, dropApplyResult = a.curr.Pipelines.ApplyOrRemoveDropPolicies() - // Skip sending to downsampler if there's a drop policy or no pipeline defined. + // Now send the results of mapping / auto-mapping rules to the relevant downsampler. + // We explicitly skip sending if there's no work to be done: specifically + // if there's a drop policy or if the staged metadata is a no-op. if len(a.curr.Pipelines) > 0 && !a.curr.IsDropPolicyApplied() && !a.curr.IsDefault() { // Send to downsampler if we have something in the pipeline. a.debugLogMatch("downsampler using built mapping staged metadatas", @@ -405,6 +404,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp } } + // Finally, process and deliver staged metadata resulting from rollup rules. numRollups := matchResult.NumNewRollupIDs() for i := 0; i < numRollups; i++ { rollup := matchResult.ForNewRollupIDsAt(i, nowNanos) From 8028eefb0e091d280c1cb4eb756a025398a0925a Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 1 Oct 2021 16:45:06 -0700 Subject: [PATCH 4/6] Relabel not-ready m3msg queue metrics (#3804) retry was extremely misleading because these messages weren't being retried. the counter is incremented if the message is scanned and the retryAt time is in the future (i.e not ready to be processed) --- src/msg/producer/writer/message_writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index f0ec1cdc3a..4bd2ef8d8d 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -191,7 +191,7 @@ func newMessageWriterMetricsWithConsumer( Tagged(map[string]string{"result": "closed"}). Counter("message-processed"), processedNotReady: consumerScope. - Tagged(map[string]string{"result": "retry"}). + Tagged(map[string]string{"result": "not-ready"}). Counter("message-processed"), processedTTL: consumerScope. Tagged(map[string]string{"result": "ttl"}). From a3270bfa20a697cc24312620eb687ca720779046 Mon Sep 17 00:00:00 2001 From: Antanas Date: Mon, 4 Oct 2021 13:41:45 +0300 Subject: [PATCH 5/6] [coordinator] [promremote] Make <500 endpoint errors as invalid params error (#3783) --- src/query/server/query_test.go | 41 +++-- .../promremote/promremotetest/test_server.go | 13 +- src/query/storage/promremote/storage.go | 10 +- src/query/storage/promremote/storage_test.go | 162 ++++++++++-------- 4 files changed, 139 insertions(+), 87 deletions(-) diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index ab57f29b50..5a4eb6de97 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -237,18 +237,37 @@ tagOptions: promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) - req, err := http.NewRequestWithContext( - context.TODO(), - http.MethodPost, - fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL), - promReqBody, - ) - require.NoError(t, err) + requestURL := fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL) + newRequest := func() *http.Request { + req, err := http.NewRequestWithContext( + context.TODO(), + http.MethodPost, + requestURL, + promReqBody, + ) + require.NoError(t, err) + return req + } - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + t.Run("write request", func(t *testing.T) { + defer externalFakePromServer.Reset() + resp, err := http.DefaultClient.Do(newRequest()) + require.NoError(t, err) + + assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + require.NoError(t, resp.Body.Close()) + }) + + t.Run("bad request propagates", func(t *testing.T) { + defer externalFakePromServer.Reset() + externalFakePromServer.SetError("badRequest", http.StatusBadRequest) + + resp, err := http.DefaultClient.Do(newRequest()) + require.NoError(t, err) + + assert.Equal(t, 400, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + }) } func TestGRPCBackend(t *testing.T) { diff --git a/src/query/storage/promremote/promremotetest/test_server.go b/src/query/storage/promremote/promremotetest/test_server.go index 913d24de36..a3570f873e 100644 --- a/src/query/storage/promremote/promremotetest/test_server.go +++ b/src/query/storage/promremote/promremotetest/test_server.go @@ -37,11 +37,16 @@ import ( type TestPromServer struct { mu sync.Mutex lastWriteRequest *prompb.WriteRequest - respErr error + respErr *respErr t *testing.T svr *httptest.Server } +type respErr struct { + error string + status int +} + // NewServer creates new instance of a fake server. func NewServer(t *testing.T) *TestPromServer { testPromServer := &TestPromServer{t: t} @@ -67,7 +72,7 @@ func (s *TestPromServer) handleWrite(w http.ResponseWriter, r *http.Request) { } s.lastWriteRequest = req if s.respErr != nil { - http.Error(w, s.respErr.Error(), http.StatusInternalServerError) + http.Error(w, s.respErr.error, s.respErr.status) return } } @@ -85,10 +90,10 @@ func (s *TestPromServer) WriteAddr() string { } // SetError sets error that will be returned for all incoming requests. -func (s *TestPromServer) SetError(err error) { +func (s *TestPromServer) SetError(body string, status int) { s.mu.Lock() defer s.mu.Unlock() - s.respErr = err + s.respErr = &respErr{error: body, status: status} } // Reset resets state to default. diff --git a/src/query/storage/promremote/storage.go b/src/query/storage/promremote/storage.go index b514596821..442ee34eea 100644 --- a/src/query/storage/promremote/storage.go +++ b/src/query/storage/promremote/storage.go @@ -164,8 +164,14 @@ func (p *promStorage) writeSingle( p.logger.Error("error reading body", zap.Error(err)) response = errorReadingBody } - return fmt.Errorf("expected status code 2XX: actual=%v, address=%v, resp=%s", - resp.StatusCode, address, response) + genericError := fmt.Errorf( + "expected status code 2XX: actual=%v, address=%v, resp=%s", + resp.StatusCode, address, response, + ) + if resp.StatusCode < 500 && resp.StatusCode != http.StatusTooManyRequests { + return xerrors.NewInvalidParamsError(genericError) + } + return genericError } metrics.ReportSuccess(methodDuration) return nil diff --git a/src/query/storage/promremote/storage_test.go b/src/query/storage/promremote/storage_test.go index 4fa312c760..2e6a71e4b6 100644 --- a/src/query/storage/promremote/storage_test.go +++ b/src/query/storage/promremote/storage_test.go @@ -22,9 +22,9 @@ package promremote import ( "context" - "errors" "io" "math/rand" + "net/http" "testing" "time" @@ -39,17 +39,20 @@ import ( "github.com/m3db/m3/src/query/storage/m3/storagemetadata" "github.com/m3db/m3/src/query/storage/promremote/promremotetest" "github.com/m3db/m3/src/query/ts" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/tallytest" xtime "github.com/m3db/m3/src/x/time" ) -var logger, _ = zap.NewDevelopment() +var ( + logger, _ = zap.NewDevelopment() + scope = tally.NewTestScope("test_scope", map[string]string{}) +) func TestWrite(t *testing.T) { fakeProm := promremotetest.NewServer(t) defer fakeProm.Close() - scope := tally.NewTestScope("test_scope", map[string]string{}) promStorage, err := NewStorage(Options{ endpoints: []EndpointOptions{{name: "testEndpoint", address: fakeProm.WriteAddr()}}, scope: scope, @@ -119,40 +122,37 @@ func TestWriteBasedOnRetention(t *testing.T) { promLongRetention2.Reset() } - scope := tally.NewTestScope("test_scope", map[string]string{}) + mediumRetentionAttr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 720 * time.Hour, + Resolution: 5 * time.Minute, + } + shortRetentionAttr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 120 * time.Hour, + Resolution: 15 * time.Second, + } + longRetentionAttr := storagemetadata.Attributes{ + Resolution: 10 * time.Minute, + Retention: 8760 * time.Hour, + } promStorage, err := NewStorage(Options{ endpoints: []EndpointOptions{ { - address: promShortRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 120 * time.Hour, - Resolution: 15 * time.Second, - }, + address: promShortRetention.WriteAddr(), + attributes: shortRetentionAttr, }, { - address: promMediumRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 720 * time.Hour, - Resolution: 5 * time.Minute, - }, + address: promMediumRetention.WriteAddr(), + attributes: mediumRetentionAttr, }, { - address: promLongRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 8760 * time.Hour, - Resolution: 10 * time.Minute, - }, + address: promLongRetention.WriteAddr(), + attributes: longRetentionAttr, }, { - address: promLongRetention2.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 8760 * time.Hour, - Resolution: 10 * time.Minute, - }, + address: promLongRetention2.WriteAddr(), + attributes: longRetentionAttr, }, }, scope: scope, @@ -161,31 +161,9 @@ func TestWriteBasedOnRetention(t *testing.T) { require.NoError(t, err) defer closeWithCheck(t, promStorage) - sendWrite := func(attr storagemetadata.Attributes) error { - //nolint: gosec - datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()} - wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{ - Tags: models.Tags{ - Opts: models.NewTagOptions(), - Tags: []models.Tag{{ - Name: []byte("test_tag_name"), - Value: []byte("test_tag_value"), - }}, - }, - Datapoints: ts.Datapoints{datapoint}, - Unit: xtime.Millisecond, - Attributes: attr, - }) - require.NoError(t, err) - return promStorage.Write(context.TODO(), wq) - } - t.Run("send short retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Retention: 120 * time.Hour, - Resolution: 15 * time.Second, - }) + err := writeTestMetric(t, promStorage, shortRetentionAttr) require.NoError(t, err) assert.NotNil(t, promShortRetention.GetLastWriteRequest()) assert.Nil(t, promMediumRetention.GetLastWriteRequest()) @@ -194,10 +172,7 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send medium retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 5 * time.Minute, - Retention: 720 * time.Hour, - }) + err := writeTestMetric(t, promStorage, mediumRetentionAttr) require.NoError(t, err) assert.Nil(t, promShortRetention.GetLastWriteRequest()) assert.NotNil(t, promMediumRetention.GetLastWriteRequest()) @@ -206,10 +181,7 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send write to multiple instances configured with same retention", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + err := writeTestMetric(t, promStorage, longRetentionAttr) require.NoError(t, err) assert.Nil(t, promShortRetention.GetLastWriteRequest()) assert.Nil(t, promMediumRetention.GetLastWriteRequest()) @@ -219,14 +191,14 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send unconfigured retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 5*time.Minute + 1, - Retention: 720 * time.Hour, + err := writeTestMetric(t, promStorage, storagemetadata.Attributes{ + Resolution: mediumRetentionAttr.Resolution + 1, + Retention: mediumRetentionAttr.Retention, }) require.Error(t, err) - err = sendWrite(storagemetadata.Attributes{ - Resolution: 5 * time.Minute, - Retention: 720*time.Hour + 1, + err = writeTestMetric(t, promStorage, storagemetadata.Attributes{ + Resolution: mediumRetentionAttr.Resolution, + Retention: mediumRetentionAttr.Retention + 1, }) require.Error(t, err) assert.Contains(t, err.Error(), "write did not match any of known endpoints") @@ -239,17 +211,67 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("error should not prevent sending to other instances", func(t *testing.T) { reset() - promLongRetention.SetError(errors.New("test err")) - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + promLongRetention.SetError("test err", http.StatusInternalServerError) + err := writeTestMetric(t, promStorage, longRetentionAttr) require.Error(t, err) assert.Contains(t, err.Error(), "test err") assert.NotNil(t, promLongRetention2.GetLastWriteRequest()) }) } +func TestErrorHandling(t *testing.T) { + svr := promremotetest.NewServer(t) + defer svr.Close() + + attr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 720 * time.Hour, + Resolution: 5 * time.Minute, + } + promStorage, err := NewStorage(Options{ + endpoints: []EndpointOptions{{address: svr.WriteAddr(), attributes: attr}}, + scope: scope, + logger: logger, + }) + require.NoError(t, err) + defer closeWithCheck(t, promStorage) + + t.Run("wrap non 5xx errors as invalid params error", func(t *testing.T) { + svr.Reset() + svr.SetError("test err", http.StatusForbidden) + err := writeTestMetric(t, promStorage, attr) + require.Error(t, err) + assert.True(t, xerrors.IsInvalidParams(err)) + }) + + t.Run("429 should not be wrapped as invalid params", func(t *testing.T) { + svr.Reset() + svr.SetError("test err", http.StatusTooManyRequests) + err := writeTestMetric(t, promStorage, attr) + require.Error(t, err) + assert.False(t, xerrors.IsInvalidParams(err)) + }) +} + func closeWithCheck(t *testing.T, c io.Closer) { require.NoError(t, c.Close()) } + +func writeTestMetric(t *testing.T, s storage.Storage, attr storagemetadata.Attributes) error { + //nolint: gosec + datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()} + wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{ + Tags: models.Tags{ + Opts: models.NewTagOptions(), + Tags: []models.Tag{{ + Name: []byte("test_tag_name"), + Value: []byte("test_tag_value"), + }}, + }, + Datapoints: ts.Datapoints{datapoint}, + Unit: xtime.Millisecond, + Attributes: attr, + }) + require.NoError(t, err) + return s.Write(context.TODO(), wq) +} From 250cdae60729f934c5ea107b63592bcf99b2a390 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linas=20Med=C5=BEi=C5=ABnas?= Date: Mon, 4 Oct 2021 15:10:09 +0300 Subject: [PATCH 6/6] [dbnode] Make termsIterFromSegments.Next tail recursive (#3807) --- .../builder/multi_segments_terms_iter.go | 101 +++++++++--------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go b/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go index 859040d0f4..1ad1a69050 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go +++ b/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go @@ -121,68 +121,67 @@ func (i *termsIterFromSegments) setField(field []byte) error { } func (i *termsIterFromSegments) Next() bool { - if i.err != nil { - return false - } - - if !i.keyIter.Next() { - return false - } + for { + if i.err != nil { + return false + } - // Create the overlayed postings list for this term - i.currPostingsList.Reset() - for _, iter := range i.keyIter.CurrentIters() { - termsKeyIter := iter.(*termsKeyIter) - _, list := termsKeyIter.iter.Current() - - if termsKeyIter.segment.offset == 0 && termsKeyIter.segment.skips == 0 { - // No offset, which means is first segment we are combining from - // so can just direct union. - if err := i.currPostingsList.Union(list); err != nil { - i.err = err - return false - } - continue + if !i.keyIter.Next() { + return false } - // We have to take into account offset and duplicates/skips. - var ( - iter = list.Iterator() - negativeOffsets = termsKeyIter.segment.negativeOffsets - multiErr = xerrors.NewMultiError() - ) - for iter.Next() { - curr := iter.Current() - negativeOffset := negativeOffsets[curr] - // Then skip the individual if matches. - if negativeOffset == -1 { - // Skip this value, as itself is a duplicate. + // Create the overlayed postings list for this term + i.currPostingsList.Reset() + for _, iter := range i.keyIter.CurrentIters() { + termsKeyIter := iter.(*termsKeyIter) + _, list := termsKeyIter.iter.Current() + + if termsKeyIter.segment.offset == 0 && termsKeyIter.segment.skips == 0 { + // No offset, which means is first segment we are combining from + // so can just direct union. + if err := i.currPostingsList.Union(list); err != nil { + i.err = err + return false + } continue } - value := curr + termsKeyIter.segment.offset - postings.ID(negativeOffset) - if err := i.currPostingsList.Insert(value); err != nil { - multiErr = multiErr.Add(err) - multiErr = multiErr.Add(iter.Close()) - i.err = multiErr.FinalError() + + // We have to take into account offset and duplicates/skips. + var ( + iter = list.Iterator() + negativeOffsets = termsKeyIter.segment.negativeOffsets + multiErr = xerrors.NewMultiError() + ) + for iter.Next() { + curr := iter.Current() + negativeOffset := negativeOffsets[curr] + // Then skip the individual if matches. + if negativeOffset == -1 { + // Skip this value, as itself is a duplicate. + continue + } + value := curr + termsKeyIter.segment.offset - postings.ID(negativeOffset) + if err := i.currPostingsList.Insert(value); err != nil { + multiErr = multiErr.Add(err) + multiErr = multiErr.Add(iter.Close()) + i.err = multiErr.FinalError() + return false + } + } + + multiErr = multiErr.Add(iter.Err()) + multiErr = multiErr.Add(iter.Close()) + i.err = multiErr.FinalError() + if i.err != nil { return false } } - multiErr = multiErr.Add(iter.Err()) - multiErr = multiErr.Add(iter.Close()) - i.err = multiErr.FinalError() - if i.err != nil { - return false + // Continue looping only if everything skipped or term is empty. + if !i.currPostingsList.IsEmpty() { + return true } } - - if i.currPostingsList.IsEmpty() { - // Everything skipped or term is empty. - // TODO: make this non-stack based (i.e. not recursive). - return i.Next() - } - - return true } func (i *termsIterFromSegments) Current() ([]byte, postings.List) {