Skip to content

Commit

Permalink
Refactor host pipelines.
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien committed Oct 9, 2024
1 parent 95780e9 commit 3663e57
Show file tree
Hide file tree
Showing 20 changed files with 265 additions and 233 deletions.
4 changes: 2 additions & 2 deletions internal/util/collections/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func NewPair[K any, V any](key K, value V) *Pair[K, V] {

// Set is a map with a comparable K key and no
// meaningful value.
type Set[K comparable] map[K]any
type Set[K comparable] map[K]struct{}

// Add keys to the Set.
func (s Set[K]) Add(keys ...K) {
for _, key := range keys {
s[key] = nil
s[key] = struct{}{}
}
}

Expand Down
2 changes: 1 addition & 1 deletion translator/tocwconfig/sampleConfig/amp_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ service:
- agenthealth/metrics
- sigv4auth
pipelines:
metrics/host:
metrics/host/cloudwatch:
exporters:
- awscloudwatch
processors:
Expand Down
10 changes: 5 additions & 5 deletions translator/tocwconfig/sampleConfig/complete_linux_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ processors:
send_batch_max_size: 0
send_batch_size: 8192
timeout: 200ms
cumulativetodelta/hostDeltaMetrics:
cumulativetodelta/hostDeltaMetrics/cloudwatch:
exclude:
match_type: strict
metrics:
Expand Down Expand Up @@ -362,7 +362,7 @@ service:
- batch/emf_logs
receivers:
- udplog/emf_logs
metrics/host:
metrics/host/cloudwatch:
exporters:
- awscloudwatch
processors:
Expand All @@ -378,11 +378,11 @@ service:
- telegraf_netstat
- telegraf_processes
- telegraf_mem
metrics/hostDeltaMetrics:
metrics/hostDeltaMetrics/cloudwatch:
exporters:
- awscloudwatch
processors:
- cumulativetodelta/hostDeltaMetrics
- cumulativetodelta/hostDeltaMetrics/cloudwatch
- ec2tagger
- transform
receivers:
Expand All @@ -394,9 +394,9 @@ service:
processors:
- filter/jmx/0
- resource/jmx
- cumulativetodelta/jmx
- transform/jmx/0
- ec2tagger
- cumulativetodelta/jmx
receivers:
- jmx/0
metrics/jmx/cloudwatch/1:
Expand Down
4 changes: 2 additions & 2 deletions translator/tocwconfig/sampleConfig/jmx_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ service:
- agenthealth/metrics
- sigv4auth
pipelines:
metrics/host:
metrics/host/cloudwatch:
exporters:
- awscloudwatch
processors:
Expand All @@ -184,8 +184,8 @@ service:
processors:
- filter/jmx
- resource/jmx
- batch/jmx/amp
- transform/jmx
- batch/jmx/amp
receivers:
- jmx
metrics/jmx/cloudwatch:
Expand Down
28 changes: 28 additions & 0 deletions translator/translate/otel/common/destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package common

import "go.opentelemetry.io/collector/confmap"

const (
DefaultDestination = ""
)

var (
metricsDestinationsKey = ConfigKey(MetricsKey, MetricsDestinationsKey)
)

func GetMetricsDestinations(conf *confmap.Conf) []string {
var destinations []string
if conf.IsSet(ConfigKey(metricsDestinationsKey, CloudWatchKey)) {
destinations = append(destinations, CloudWatchKey)
}
if conf.IsSet(ConfigKey(metricsDestinationsKey, AMPKey)) {
destinations = append(destinations, AMPKey)
}
if conf.IsSet(MetricsKey) && len(destinations) == 0 {
destinations = append(destinations, DefaultDestination)
}
return destinations
}
68 changes: 68 additions & 0 deletions translator/translate/otel/common/destination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package common

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/confmap"
)

func TestGetMetricsDestinations(t *testing.T) {
testCases := map[string]struct {
input map[string]any
want []string
}{
"WithNoMetrics": {
input: map[string]any{
"logs": map[string]any{},
},
},
"WithMetrics/Default": {
input: map[string]any{
"metrics": map[string]any{},
},
want: []string{DefaultDestination},
},
"WithMetrics/AMP": {
input: map[string]any{
"metrics": map[string]any{
"metrics_destinations": map[string]any{
"amp": map[string]any{},
},
},
},
want: []string{AMPKey},
},
"WithMetrics/CloudWatch": {
input: map[string]any{
"metrics": map[string]any{
"metrics_destinations": map[string]any{
"cloudwatch": map[string]any{},
},
},
},
want: []string{CloudWatchKey},
},
"WithMetrics/CloudWatch&AMP": {
input: map[string]any{
"metrics": map[string]any{
"metrics_destinations": map[string]any{
"cloudwatch": map[string]any{},
"amp": map[string]any{},
},
},
},
want: []string{CloudWatchKey, AMPKey},
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
conf := confmap.NewFromStringMap(testCase.input)
got := GetMetricsDestinations(conf)
assert.Equal(t, testCase.want, got)
})
}
}
24 changes: 24 additions & 0 deletions translator/translate/otel/common/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,27 @@ func (p *IndexProvider) Index() int {
func (p *IndexProvider) SetIndex(index int) {
p.index = index
}

type DestinationSetter interface {
SetDestination(string)
}

func WithDestination(name string) TranslatorOption {
return func(target any) {
if setter, ok := target.(DestinationSetter); ok {
setter.SetDestination(name)
}
}
}

type DestinationProvider struct {
destination string
}

func (p *DestinationProvider) Destination() string {
return p.destination
}

func (p *DestinationProvider) SetDestination(destination string) {
p.destination = destination
}
7 changes: 7 additions & 0 deletions translator/translate/otel/common/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ func TestWithIndex(t *testing.T) {
opt(p)
assert.Equal(t, 1, p.Index())
}

func TestWithDestination(t *testing.T) {
p := &DestinationProvider{destination: "a"}
opt := WithDestination("b")
opt(p)
assert.Equal(t, "b", p.Destination())
}
83 changes: 44 additions & 39 deletions translator/translate/otel/pipeline/host/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"go.opentelemetry.io/collector/confmap"

"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awsemf"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/prometheusremotewrite"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/sigv4auth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
Expand All @@ -22,10 +25,9 @@ import (
)

type translator struct {
name string
receivers common.TranslatorMap[component.Config]
exporters common.TranslatorMap[component.Config]
extensions common.TranslatorMap[component.Config]
name string
common.DestinationProvider
receivers common.TranslatorMap[component.Config]
}

var _ common.Translator[*common.ComponentTranslators] = (*translator)(nil)
Expand All @@ -36,10 +38,16 @@ var _ common.Translator[*common.ComponentTranslators] = (*translator)(nil)
func NewTranslator(
name string,
receivers common.TranslatorMap[component.Config],
exporters common.TranslatorMap[component.Config],
extensions common.TranslatorMap[component.Config],
opts ...common.TranslatorOption,
) common.Translator[*common.ComponentTranslators] {
return &translator{name, receivers, exporters, extensions}
t := &translator{name: name, receivers: receivers}
for _, opt := range opts {
opt(t)
}
if t.Destination() != "" {
t.name += "/" + t.Destination()
}
return t
}

func (t translator) ID() component.ID {
Expand All @@ -48,21 +56,15 @@ func (t translator) ID() component.ID {

// Translate creates a pipeline if metrics section exists.
func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, error) {
if conf == nil || (!conf.IsSet(common.MetricsKey) && !conf.IsSet(common.ConfigKey(common.LogsKey, common.MetricsCollectedKey))) {
return nil, &common.MissingKeyError{
ID: t.ID(),
JsonKey: fmt.Sprint(common.MetricsKey, " or ", common.ConfigKey(common.LogsKey, common.MetricsCollectedKey)),
}
} else if t.receivers.Len() == 0 {
log.Printf("D! pipeline %s has no receivers", t.name)
return nil, nil
if conf == nil || t.receivers.Len() == 0 {
return nil, fmt.Errorf("no receivers configured in pipeline %s", t.name)
}

translators := common.ComponentTranslators{
Receivers: t.receivers,
Processors: common.NewTranslatorMap[component.Config](),
Exporters: t.exporters,
Extensions: t.extensions,
Exporters: common.NewTranslatorMap[component.Config](),
Extensions: common.NewTranslatorMap[component.Config](),
}

// we need to add delta processor because (only) diskio and net input plugins report delta metric
Expand All @@ -71,33 +73,36 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
translators.Processors.Set(cumulativetodeltaprocessor.NewTranslator(common.WithName(t.name), cumulativetodeltaprocessor.WithDefaultKeys()))
}

if conf.IsSet(common.ConfigKey(common.MetricsKey, common.AppendDimensionsKey)) {
log.Printf("D! ec2tagger processor required because append_dimensions is set")
translators.Processors.Set(ec2taggerprocessor.NewTranslator())
}
if t.Destination() != common.Emf {
if conf.IsSet(common.ConfigKey(common.MetricsKey, common.AppendDimensionsKey)) {
log.Printf("D! ec2tagger processor required because append_dimensions is set")
translators.Processors.Set(ec2taggerprocessor.NewTranslator())
}

mdt := metricsdecorator.NewTranslator(metricsdecorator.WithIgnorePlugins(common.JmxKey))
if mdt.IsSet(conf) {
log.Printf("D! metric decorator required because measurement fields are set")
translators.Processors.Set(mdt)
mdt := metricsdecorator.NewTranslator(metricsdecorator.WithIgnorePlugins(common.JmxKey))
if mdt.IsSet(conf) {
log.Printf("D! metric decorator required because measurement fields are set")
translators.Processors.Set(mdt)
}
}

_, ok1 := t.exporters.Get(component.NewID(component.MustNewType("prometheusremotewrite")))
_, ok2 := t.exporters.Get(component.MustNewIDWithName("prometheusremotewrite", "amp"))

if ok1 || ok2 {
switch t.Destination() {
case common.DefaultDestination, common.CloudWatchKey:
translators.Exporters.Set(awscloudwatch.NewTranslator())
translators.Extensions.Set(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData}))
case common.AMPKey:
if conf.IsSet(common.MetricsAggregationDimensionsKey) {
translators.Processors.Set(rollupprocessor.NewTranslator())
}
translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey))
translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey))
translators.Extensions.Set(sigv4auth.NewTranslator())
case common.Emf:
translators.Exporters.Set(awsemf.NewTranslator())
translators.Extensions.Set(agenthealth.NewTranslator(component.DataTypeLogs, []string{agenthealth.OperationPutLogEvents}))
default:
return nil, fmt.Errorf("pipeline (%s) does not support destination (%s) in configuration", t.name, t.Destination())
}

if (ok1 || ok2) && conf.IsSet(common.MetricsAggregationDimensionsKey) {
translators.Processors.Set(rollupprocessor.NewTranslator())
}

if _, ok := t.exporters.Get(component.NewID(component.MustNewType("awscloudwatch"))); !ok {
translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey))
} else {
// only add agenthealth for the cloudwatch exporter
translators.Extensions.Set(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData}))
}
return &translators, nil
}
Loading

0 comments on commit 3663e57

Please sign in to comment.