Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Rollout augmentM3Tags flag to true by default #3082

Merged
merged 3 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion site/content/m3query/architecture/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ chapter: true

## Overview

M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.github.io/m3/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.io/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
2 changes: 1 addition & 1 deletion site/content/m3query/config/annotated_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ writeWorkerPoolPolicy:
size: <int>

tagOptions:
# See here for more information: http://m3db.github.io/m3/how_to/query/#id-generation
# See here for more information under ID generation: https://m3db.io/docs/how_to/query/
idScheme: <id_scheme>

# lookbackDuration defines, at each step, how long we lookback until we see a non-NaN value.
Expand Down
2 changes: 1 addition & 1 deletion site/content/operator/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Namespace defines an M3DB namespace or points to a preset M3DB namespace.

## NamespaceOptions

NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.github.io/m3/operational_guide/namespace_configuration/ for more details.
NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.io/docs/operational_guide/namespace_configuration/ for more details.

| Field | Description | Scheme | Required |
| ----- | ----------- | ------ | -------- |
Expand Down
1 change: 0 additions & 1 deletion src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.augmentM3Tags,
}
}

Expand Down
61 changes: 15 additions & 46 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ type metricsAppenderOptions struct {
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool
augmentM3Tags bool

clockOpts clock.Options
debugLogging bool
Expand Down Expand Up @@ -149,19 +148,16 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}
tags := a.originalTags

// Augment tags if necessary.
if a.augmentM3Tags {
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}

// Sort tags
Expand Down Expand Up @@ -190,11 +186,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos)
id.Close()

// If we augmented metrics tags before running the forward match, then
// filter them out.
if a.augmentM3Tags {
tags.filterPrefix(metric.M3MetricsPrefix)
}
// filter out augmented metrics tags
tags.filterPrefix(metric.M3MetricsPrefix)

var dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
if opts.Override {
Expand All @@ -215,7 +208,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
append(a.curr.Pipelines, pipelines.Pipelines...)
}

if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil {
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
} else {
Expand Down Expand Up @@ -358,7 +351,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
a.debugLogMatch("downsampler using built mapping staged metadatas",
debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}})

if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil {
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
}
Expand Down Expand Up @@ -472,31 +465,7 @@ func (a *metricsAppender) resetTags() {
a.originalTags = nil
}

func (a *metricsAppender) addSamplesAppenders(
originalTags *tags,
stagedMetadata metadata.StagedMetadata,
unownedID []byte,
) error {
// Check if any of the pipelines have tags or a graphite prefix to set.
var tagsExist bool
for _, pipeline := range stagedMetadata.Pipelines {
if len(pipeline.Tags) > 0 || len(pipeline.GraphitePrefix) > 0 {
tagsExist = true
break
}
}

// If we do not need to do any tag augmentation then just return.
if !a.augmentM3Tags && !tagsExist {
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
unownedID: unownedID,
stagedMetadatas: []metadata.StagedMetadata{stagedMetadata},
})
return nil
}

func (a *metricsAppender) addSamplesAppenders(originalTags *tags, stagedMetadata metadata.StagedMetadata) error {
var (
pipelines []metadata.PipelineMetadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func TestSamplesAppenderPoolResetsTagsAcrossSamples(t *testing.T) {
}

// NB: expected ID is generated into human-readable form
// from tags in ForwardMatch mock above.
expected := fmt.Sprintf("foo%d-bar%d", i, i)
// from tags in ForwardMatch mock above. Also include the m3 type, which is included when matching.
// nolint:scopelint
expected := fmt.Sprintf("__m3_type__-gauge,foo%d-bar%d", i, i)
if expected != u.ID.String() {
// NB: if this fails, appender is holding state after Finalize.
return fmt.Errorf("expected ID %s, got %s", expected, u.ID.String())
Expand Down
37 changes: 9 additions & 28 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"runtime"
"strings"
"time"

"github.com/m3db/m3/src/aggregator/aggregator"
Expand Down Expand Up @@ -225,10 +224,9 @@ type agg struct {
aggregator aggregator.Aggregator
clientRemote client.Client

clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
augmentM3Tags bool
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
}

// Configuration configurates a downsampler.
Expand Down Expand Up @@ -262,14 +260,6 @@ type Configuration struct {

// EntryTTL determines how long an entry remains alive before it may be expired due to inactivity.
EntryTTL time.Duration `yaml:"entryTTL"`

// AugmentM3Tags will augment the metric type to aggregated metrics
// to be used within the filter for rules. If enabled, for example,
// your filter can specify '__m3_type__:gauge' to filter by gauges.
// This is particularly useful for Graphite metrics today.
// Furthermore, the option is automatically enabled if static rules are
// used and any filter contain an __m3_type__ tag.
AugmentM3Tags bool `yaml:"augmentM3Tags"`
}

// MatcherConfiguration is the configuration for the rule matcher.
Expand Down Expand Up @@ -658,7 +648,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
scope = instrumentOpts.MetricsScope()
logger = instrumentOpts.Logger()
openTimeout = defaultOpenTimeout
augmentM3Tags = cfg.AugmentM3Tags
namespaceTag = defaultNamespaceTag
)
if o.StorageFlushConcurrency > 0 {
Expand Down Expand Up @@ -717,9 +706,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
rs := rules.NewEmptyRuleSet(defaultConfigInMemoryNamespace,
updateMetadata)
for _, mappingRule := range cfg.Rules.MappingRules {
if strings.Contains(mappingRule.Filter, metric.M3MetricsPrefixString) {
augmentM3Tags = true
}
rule, err := mappingRule.Rule()
if err != nil {
return agg{}, err
Expand All @@ -732,9 +718,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

for _, rollupRule := range cfg.Rules.RollupRules {
if strings.Contains(rollupRule.Filter, metric.M3MetricsPrefixString) {
augmentM3Tags = true
}
rule, err := rollupRule.Rule()
if err != nil {
return agg{}, err
Expand Down Expand Up @@ -788,10 +771,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
clientRemote: client,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
clientRemote: client,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down Expand Up @@ -953,10 +935,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down