Skip to content

Commit

Permalink
Merge branch 'master' into nb/setup
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Oct 4, 2021
2 parents 6e6bd2f + 250cdae commit 6a7a129
Show file tree
Hide file tree
Showing 15 changed files with 732 additions and 162 deletions.
28 changes: 14 additions & 14 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// Reuse a slice to keep the current staged metadatas we will apply.
a.curr.Pipelines = a.curr.Pipelines[:0]

// First, process any override explicitly provided as part of request
// (via request headers that specify target namespaces).
if opts.Override {
// Process an override explicitly provided as part of request.
for _, rule := range opts.OverrideRules.MappingRules {
stagedMetadatas, err := rule.StagedMetadatas()
if err != nil {
Expand All @@ -248,12 +249,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}, nil
}

// NB(r): First apply mapping rules to see which storage policies
// have been applied, any that have been applied as part of
// mapping rules that exact match a default storage policy will be
// skipped when applying default rules, so as to avoid storing
// the same metrics in the same namespace with the same metric
// name and tags (i.e. overwriting each other).
// Next, apply any mapping rules that match. We track which storage policies have been applied based on the
// mapping rules that match. Any storage policies that have been applied will be skipped when applying
// the auto-mapping rules to avoid redundant writes (i.e. overwriting each other).
var (
ruleStagedMetadatas = matchResult.ForExistingIDAt(nowNanos)
dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
Expand Down Expand Up @@ -293,13 +291,12 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...)
}

// Always aggregate any default staged metadatas with a few exceptions.
// Exceptions are:
// Next, we cover auto-mapping (otherwise referred to as default) rules.
// We always aggregate any default rules with a few exceptions:
// 1. A mapping rule has provided an override for a storage policy,
// if so then skip aggregating for that storage policy).
// 2. Any type of drop rule has been set, since they should only
// impact mapping rules, not default staged metadatas provided from
// auto-mapping rules (i.e. default namespace aggregation).
// if so then skip aggregating for that storage policy.
// This is what we calculated in the step above.
// 2. Any type of drop rule has been set. Drop rules should mean that the auto-mapping rules are ignored.
if !a.curr.Pipelines.IsDropPolicySet() {
// No drop rule has been set as part of rule matching.
for idx, stagedMetadatasProto := range a.defaultStagedMetadatasProtos {
Expand Down Expand Up @@ -394,7 +391,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// Apply drop policies results
a.curr.Pipelines, dropApplyResult = a.curr.Pipelines.ApplyOrRemoveDropPolicies()

// Skip sending to downsampler if there's a drop policy or no pipeline defined.
// Now send the results of mapping / auto-mapping rules to the relevant downsampler.
// We explicitly skip sending if there's no work to be done: specifically
// if there's a drop policy or if the staged metadata is a no-op.
if len(a.curr.Pipelines) > 0 && !a.curr.IsDropPolicyApplied() && !a.curr.IsDefault() {
// Send to downsampler if we have something in the pipeline.
a.debugLogMatch("downsampler using built mapping staged metadatas",
Expand All @@ -405,6 +404,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}
}

// Finally, process and deliver staged metadata resulting from rollup rules.
numRollups := matchResult.NumNewRollupIDs()
for i := 0; i < numRollups; i++ {
rollup := matchResult.ForNewRollupIDsAt(i, nowNanos)
Expand Down
180 changes: 180 additions & 0 deletions src/integration/resources/inprocess/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inprocess

import (
"errors"
"io/ioutil"
"os"
"time"

"go.uber.org/zap"
"gopkg.in/yaml.v2"

m3agg "github.com/m3db/m3/src/aggregator/aggregator"
"github.com/m3db/m3/src/aggregator/server"
"github.com/m3db/m3/src/cmd/services/m3aggregator/config"
"github.com/m3db/m3/src/integration/resources"
nettest "github.com/m3db/m3/src/integration/resources/net"
xos "github.com/m3db/m3/src/x/os"
)

type aggregator struct {
cfg config.Configuration
logger *zap.Logger
tmpDirs []string

interruptCh chan<- error
shutdownCh <-chan struct{}
}

// AggregatorOptions are options of starting an in-process aggregator.
type AggregatorOptions struct {
// Logger is the logger to use for the in-process aggregator.
Logger *zap.Logger
}

// NewAggregator creates a new in-process aggregator based on the configuration
// and options provided.
func NewAggregator(yamlCfg string, opts AggregatorOptions) (resources.Aggregator, error) {
var cfg config.Configuration
if err := yaml.Unmarshal([]byte(yamlCfg), &cfg); err != nil {
return nil, err
}

// Replace any "0" ports with an open port
cfg, err := updateAggregatorPorts(cfg)
if err != nil {
return nil, err
}

// Replace any "*" filepath with a temporary directory
cfg, tmpDirs, err := updateAggregatorFilepaths(cfg)
if err != nil {
return nil, err
}

if opts.Logger == nil {
var err error
opts.Logger, err = zap.NewDevelopment()
if err != nil {
return nil, err
}
}

agg := &aggregator{
cfg: cfg,
logger: opts.Logger,
tmpDirs: tmpDirs,
}
agg.start()

return agg, nil
}

func (a *aggregator) IsHealthy(instance string) error {
return nil
}

func (a *aggregator) Status(instance string) (m3agg.RuntimeStatus, error) {
return m3agg.RuntimeStatus{}, nil
}

func (a *aggregator) Resign(instance string) error {
return nil
}

func (a *aggregator) Close() error {
defer func() {
for _, dir := range a.tmpDirs {
if err := os.RemoveAll(dir); err != nil {
a.logger.Error("error removing temp directory", zap.String("dir", dir), zap.Error(err))
}
}
}()

select {
case a.interruptCh <- xos.NewInterruptError("in-process aggregator being shut down"):
case <-time.After(interruptTimeout):
return errors.New("timeout sending interrupt. closing without graceful shutdown")
}

select {
case <-a.shutdownCh:
case <-time.After(shutdownTimeout):
return errors.New("timeout waiting for shutdown notification. server closing may" +
" not be completely graceful")
}

return nil
}

func (a *aggregator) start() {
interruptCh := make(chan error, 1)
shutdownCh := make(chan struct{}, 1)

go func() {
server.Run(server.RunOptions{
Config: a.cfg,
InterruptCh: interruptCh,
ShutdownCh: shutdownCh,
})
}()

a.interruptCh = interruptCh
a.shutdownCh = shutdownCh
}

func updateAggregatorPorts(cfg config.Configuration) (config.Configuration, error) {
if cfg.HTTP != nil && len(cfg.HTTP.ListenAddress) > 0 {
addr, _, _, err := nettest.MaybeGeneratePort(cfg.HTTP.ListenAddress)
if err != nil {
return cfg, err
}

cfg.HTTP.ListenAddress = addr
}

if cfg.M3Msg != nil && len(cfg.M3Msg.Server.ListenAddress) > 0 {
addr, _, _, err := nettest.MaybeGeneratePort(cfg.M3Msg.Server.ListenAddress)
if err != nil {
return cfg, err
}

cfg.M3Msg.Server.ListenAddress = addr
}

return cfg, nil
}

func updateAggregatorFilepaths(cfg config.Configuration) (config.Configuration, []string, error) {
tmpDirs := make([]string, 0, 1)
if cfg.KVClient.Etcd != nil && cfg.KVClient.Etcd.CacheDir == "*" {
dir, err := ioutil.TempDir("", "m3agg-*")
if err != nil {
return cfg, tmpDirs, err
}
tmpDirs = append(tmpDirs, dir)
cfg.KVClient.Etcd.CacheDir = dir
}

return cfg, tmpDirs, nil
}
Loading

0 comments on commit 6a7a129

Please sign in to comment.