Skip to content

Commit

Permalink
[exporter/datadog] Compatibility with ECS Fargate semantic conventions (
Browse files Browse the repository at this point in the history
#6670)

* [exporter/datadogexporter] Make hostname empty on AWS ECS Fargate

* Revert "[exporter/datadog] Always add current hostname (#5967)"

This reverts commit 247420a.

We can't always add the current hostname *and* not expose the hostname in Fargate.

* [exporter/datadogexporter] Add `task_arn` instead of a hostname for running metrics

We do this because of two reasons:
1. we don't want to expose the Fargate hostname and
2. users care about identifying where their Collector is running (in this case which task).

* Address review comment

* Add changelog note
  • Loading branch information
mx-psi authored Dec 21, 2021
1 parent ed5c1e8 commit 290975f
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## 💡 Enhancements 💡

- `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679)
- `datadogexporter`: Add compatibility with ECS Fargate semantic conventions (#6670)

## 🛑 Breaking changes 🛑

Expand Down
17 changes: 17 additions & 0 deletions exporter/datadogexporter/internal/attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ var (
conventions.AttributeAWSECSContainerARN,
}

runningTagsAttributes = []string{
conventions.AttributeAWSECSTaskARN,
}

// Kubernetes mappings defines the mapping between Kubernetes conventions (both general and Datadog specific)
// and Datadog Agent conventions. The Datadog Agent conventions can be found at
// https://github.com/DataDog/datadog-agent/blob/e081bed/pkg/tagger/collectors/const.go and
Expand Down Expand Up @@ -159,6 +163,19 @@ func TagsFromAttributes(attrs pdata.AttributeMap) []string {
return tags
}

// RunningTagsFromAttributes gets tags used for running metrics from attributes.
func RunningTagsFromAttributes(attrs pdata.AttributeMap) []string {
tags := make([]string, 0, 1)
for _, key := range runningTagsAttributes {
if val, ok := attrs.Get(key); ok {
if ddKey, found := conventionsMapping[key]; found && val.StringVal() != "" {
tags = append(tags, fmt.Sprintf("%s:%s", ddKey, val.StringVal()))
}
}
}
return tags
}

// ContainerTagFromAttributes extracts the value of _dd.tags.container from the given
// set of attributes.
func ContainerTagFromAttributes(attr map[string]string) string {
Expand Down
5 changes: 5 additions & 0 deletions exporter/datadogexporter/internal/attributes/hostname.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func unsanitizedHostnameFromAttributes(attrs pdata.AttributeMap) (string, bool)
return customHostname.StringVal(), true
}

if launchType, ok := attrs.Get(conventions.AttributeAWSECSLaunchtype); ok && launchType.StringVal() == conventions.AttributeAWSECSLaunchtypeFargate {
// If on AWS ECS Fargate, return a valid but empty hostname
return "", true
}

// Kubernetes: node-cluster if cluster name is available, else node
if k8sNodeName, ok := attrs.Get(AttributeK8sNodeName); ok {
if k8sClusterName, ok := getClusterName(attrs); ok {
Expand Down
13 changes: 13 additions & 0 deletions exporter/datadogexporter/internal/attributes/hostname_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ func TestHostnameFromAttributes(t *testing.T) {
assert.True(t, ok)
assert.Equal(t, hostname, testHostName)

// AWS cloud provider means relying on the EC2 function
attrs = testutils.NewAttributeMap(map[string]string{
conventions.AttributeCloudProvider: conventions.AttributeCloudProviderAWS,
conventions.AttributeCloudPlatform: conventions.AttributeCloudPlatformAWSECS,
conventions.AttributeAWSECSTaskARN: "example-task-ARN",
conventions.AttributeAWSECSTaskFamily: "example-task-family",
conventions.AttributeAWSECSTaskRevision: "example-task-revision",
conventions.AttributeAWSECSLaunchtype: conventions.AttributeAWSECSLaunchtypeFargate,
})
hostname, ok = HostnameFromAttributes(attrs)
assert.True(t, ok)
assert.Empty(t, hostname)

// GCP cloud provider means relying on the GCP function
attrs = testutils.NewAttributeMap(map[string]string{
conventions.AttributeCloudProvider: conventions.AttributeCloudProviderGCP,
Expand Down
16 changes: 16 additions & 0 deletions exporter/datadogexporter/internal/metrics/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@ import (

var _ translator.Consumer = (*Consumer)(nil)
var _ translator.HostConsumer = (*Consumer)(nil)
var _ translator.TagsConsumer = (*Consumer)(nil)

// Consumer is the metrics Consumer.
type Consumer struct {
ms []datadog.Metric
sl sketches.SketchSeriesList
seenHosts map[string]struct{}
seenTags map[string]struct{}
}

// NewConsumer creates a new zorkian consumer.
func NewConsumer() *Consumer {
return &Consumer{
seenHosts: make(map[string]struct{}),
seenTags: make(map[string]struct{}),
}
}

Expand All @@ -64,6 +67,14 @@ func (c *Consumer) runningMetrics(timestamp uint64, buildInfo component.BuildInf
series = append(series, runningMetric...)
}

for tag := range c.seenTags {
runningMetrics := DefaultMetrics("metrics", "", timestamp, buildInfo)
for i := range runningMetrics {
runningMetrics[i].Tags = append(runningMetrics[i].Tags, tag)
}
series = append(series, runningMetrics...)
}

return
}

Expand Down Expand Up @@ -115,3 +126,8 @@ func (c *Consumer) ConsumeSketch(
func (c *Consumer) ConsumeHost(host string) {
c.seenHosts[host] = struct{}{}
}

// ConsumeTag implements the translator.TagsConsumer interface.
func (c *Consumer) ConsumeTag(tag string) {
c.seenTags[tag] = struct{}{}
}
47 changes: 47 additions & 0 deletions exporter/datadogexporter/internal/metrics/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/translator"
)

Expand Down Expand Up @@ -82,3 +84,48 @@ func TestRunningMetrics(t *testing.T) {
)

}

func TestTagsMetrics(t *testing.T) {
ms := pdata.NewMetrics()
rms := ms.ResourceMetrics()

rm := rms.AppendEmpty()
baseAttrs := testutils.NewAttributeMap(map[string]string{
conventions.AttributeCloudProvider: conventions.AttributeCloudProviderAWS,
conventions.AttributeCloudPlatform: conventions.AttributeCloudPlatformAWSECS,
conventions.AttributeAWSECSTaskFamily: "example-task-family",
conventions.AttributeAWSECSTaskRevision: "example-task-revision",
conventions.AttributeAWSECSLaunchtype: conventions.AttributeAWSECSLaunchtypeFargate,
})
baseAttrs.CopyTo(rm.Resource().Attributes())
rm.Resource().Attributes().InsertString(conventions.AttributeAWSECSTaskARN, "task-arn-1")

rm = rms.AppendEmpty()
baseAttrs.CopyTo(rm.Resource().Attributes())
rm.Resource().Attributes().InsertString(conventions.AttributeAWSECSTaskARN, "task-arn-2")

rm = rms.AppendEmpty()
baseAttrs.CopyTo(rm.Resource().Attributes())
rm.Resource().Attributes().InsertString(conventions.AttributeAWSECSTaskARN, "task-arn-3")

logger, _ := zap.NewProduction()
tr := newTranslator(t, logger)

ctx := context.Background()
consumer := NewConsumer()
tr.MapMetrics(ctx, ms, consumer)

runningMetrics := consumer.runningMetrics(0, component.BuildInfo{})
runningTags := []string{}
runningHostnames := []string{}
for _, metric := range runningMetrics {
runningTags = append(runningTags, metric.Tags...)
if metric.Host != nil {
runningHostnames = append(runningHostnames, *metric.Host)
}
}

assert.ElementsMatch(t, runningHostnames, []string{"", "", ""})
assert.Len(t, runningMetrics, 3)
assert.ElementsMatch(t, runningTags, []string{"task_arn:task-arn-1", "task_arn:task-arn-2", "task_arn:task-arn-3"})
}
9 changes: 9 additions & 0 deletions exporter/datadogexporter/internal/translator/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,12 @@ type HostConsumer interface {
// ConsumeHost consumes a hostname.
ConsumeHost(host string)
}

// TagsConsumer is a tags consumer.
// It is an optional interface that can be implemented by a Consumer.
// Consumed tags are used for running metrics, and should represent
// some resource running a Collector (e.g. Fargate task).
type TagsConsumer interface {
// ConsumeTag consumes a tag
ConsumeTag(tag string)
}
16 changes: 13 additions & 3 deletions exporter/datadogexporter/internal/translator/metrics_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,19 @@ func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics, consumer
}
}

// Track hosts if the consumer is a HostConsumer.
if c, ok := consumer.(HostConsumer); ok {
c.ConsumeHost(host)
if host != "" {
// Track hosts if the consumer is a HostConsumer.
if c, ok := consumer.(HostConsumer); ok {
c.ConsumeHost(host)
}
} else {
// Track task ARN if the consumer is a TagsConsumer.
if c, ok := consumer.(TagsConsumer); ok {
tags := attributes.RunningTagsFromAttributes(rm.Resource().Attributes())
for _, tag := range tags {
c.ConsumeTag(tag)
}
}
}

ilms := rm.InstrumentationLibraryMetrics()
Expand Down
1 change: 0 additions & 1 deletion exporter/datadogexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pdata.Metric
}

consumer := metrics.NewConsumer()
consumer.ConsumeHost(metadata.GetHost(exp.params.Logger, exp.cfg))
pushTime := uint64(time.Now().UTC().UnixNano())
err := exp.tr.MapMetrics(ctx, md, consumer)
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions exporter/datadogexporter/translate_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func convertToDatadogTd(td pdata.Traces, fallbackHost string, cfg *config.Config
var traces []*pb.TracePayload

seenHosts := make(map[string]struct{})
seenHosts[fallbackHost] = struct{}{}
seenTags := make(map[string]struct{})
var series []datadog.Metric
pushTime := pdata.NewTimestampFromTime(time.Now())

Expand All @@ -93,7 +93,14 @@ func convertToDatadogTd(td pdata.Traces, fallbackHost string, cfg *config.Config
host = fallbackHost
}

seenHosts[host] = struct{}{}
if host != "" {
seenHosts[host] = struct{}{}
} else {
tags := attributes.RunningTagsFromAttributes(rs.Resource().Attributes())
for _, tag := range tags {
seenTags[tag] = struct{}{}
}
}
payload := resourceSpansToDatadogSpans(rs, host, cfg, blk, spanNameMap)

traces = append(traces, &payload)
Expand All @@ -105,6 +112,14 @@ func convertToDatadogTd(td pdata.Traces, fallbackHost string, cfg *config.Config
series = append(series, runningMetric...)
}

for tag := range seenTags {
runningMetrics := metrics.DefaultMetrics("traces", "", uint64(pushTime), buildInfo)
for i := range runningMetrics {
runningMetrics[i].Tags = append(runningMetrics[i].Tags, tag)
}
series = append(series, runningMetrics...)
}

return traces, series
}

Expand Down
44 changes: 43 additions & 1 deletion exporter/datadogexporter/translate_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/utils"
)

Expand Down Expand Up @@ -148,7 +149,7 @@ func TestConvertToDatadogTdNoResourceSpans(t *testing.T) {
outputTraces, runningMetrics := convertToDatadogTd(traces, "test-host", &config.Config{}, denylister, buildInfo)

assert.Equal(t, 0, len(outputTraces))
assert.Equal(t, 1, len(runningMetrics))
assert.Equal(t, 0, len(runningMetrics))
}

func TestRunningTraces(t *testing.T) {
Expand Down Expand Up @@ -189,6 +190,47 @@ func TestRunningTraces(t *testing.T) {
)
}

func TestRunningTracesARN(t *testing.T) {
td := pdata.NewTraces()
rts := td.ResourceSpans()

rm := rts.AppendEmpty()
baseAttrs := testutils.NewAttributeMap(map[string]string{
conventions.AttributeCloudProvider: conventions.AttributeCloudProviderAWS,
conventions.AttributeCloudPlatform: conventions.AttributeCloudPlatformAWSECS,
conventions.AttributeAWSECSTaskFamily: "example-task-family",
conventions.AttributeAWSECSTaskRevision: "example-task-revision",
conventions.AttributeAWSECSLaunchtype: conventions.AttributeAWSECSLaunchtypeFargate,
})
baseAttrs.CopyTo(rm.Resource().Attributes())
rm.Resource().Attributes().InsertString(conventions.AttributeAWSECSTaskARN, "task-arn-1")

rm = rts.AppendEmpty()
baseAttrs.CopyTo(rm.Resource().Attributes())
rm.Resource().Attributes().InsertString(conventions.AttributeAWSECSTaskARN, "task-arn-2")

rm = rts.AppendEmpty()
baseAttrs.CopyTo(rm.Resource().Attributes())
rm.Resource().Attributes().InsertString(conventions.AttributeAWSECSTaskARN, "task-arn-3")

buildInfo := component.BuildInfo{}

_, runningMetrics := convertToDatadogTd(td, "fallbackHost", &config.Config{}, newDenylister([]string{}), buildInfo)

runningHostnames := []string{}
runningTags := []string{}
for _, metric := range runningMetrics {
require.Equal(t, *metric.Metric, "otel.datadog_exporter.traces.running")
require.NotNil(t, metric.Host)
runningHostnames = append(runningHostnames, *metric.Host)
runningTags = append(runningTags, metric.Tags...)
}

assert.ElementsMatch(t, runningHostnames, []string{"", "", ""})
assert.Len(t, runningMetrics, 3)
assert.ElementsMatch(t, runningTags, []string{"task_arn:task-arn-1", "task_arn:task-arn-2", "task_arn:task-arn-3"})
}

func TestObfuscation(t *testing.T) {

denylister := newDenylister([]string{})
Expand Down

0 comments on commit 290975f

Please sign in to comment.