Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add container tags #31642

Merged
6 changes: 6 additions & 0 deletions connector/datadogconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type TracesConfig struct {
// TraceBuffer specifies the number of Datadog Agent TracerPayloads to buffer before dropping.
// The default value is 1000.
TraceBuffer int `mapstructure:"trace_buffer"`

// EnableContainerStats specifies whether to enable container stats collection.
EnableContainerStats bool `mapstructure:"enable_container_stats"`

// ContainerTagAttributes specifies the list of container attributes to be added to the span tags.
ContainerTagAttributes []string `mapstructure:"container_tag_attributes"`
}

// Validate the configuration for errors. This is required by component.Config.
Expand Down
64 changes: 58 additions & 6 deletions connector/datadogconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"github.com/patrickmn/go-cache"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.17.0"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"

Expand All @@ -36,6 +38,10 @@ type traceToMetricConnector struct {
// from the agent to OTLP Metrics.
translator *metrics.Translator

enableContainerStats bool
containerTagAttrs map[string]string
containerTagCache *cache.Cache

// in specifies the channel through which the agent will output Stats Payloads
// resulting from ingested traces.
in chan *pb.StatsPayload
Expand All @@ -60,14 +66,23 @@ func newTraceToMetricConnector(set component.TelemetrySettings, cfg component.Co
return nil, fmt.Errorf("failed to create metrics translator: %w", err)
}

ctags := make(map[string]string, len(cfg.(*Config).Traces.ContainerTagAttributes))
for _, val := range cfg.(*Config).Traces.ContainerTagAttributes {
ctags[val] = ""
}
ddtags := attributes.ContainerTagFromAttributes(ctags)

ctx := context.Background()
return &traceToMetricConnector{
logger: set.Logger,
agent: datadog.NewAgentWithConfig(ctx, getTraceAgentCfg(cfg.(*Config).Traces, attributesTranslator), in, metricsClient, timingReporter),
translator: trans,
in: in,
metricsConsumer: metricsConsumer,
exit: make(chan struct{}),
logger: set.Logger,
agent: datadog.NewAgentWithConfig(ctx, getTraceAgentCfg(cfg.(*Config).Traces, attributesTranslator), in, metricsClient, timingReporter),
translator: trans,
in: in,
metricsConsumer: metricsConsumer,
enableContainerStats: cfg.(*Config).Traces.EnableContainerStats,
containerTagAttrs: ddtags,
containerTagCache: cache.New(cache.DefaultExpiration, cache.DefaultExpiration),
exit: make(chan struct{}),
}, nil
}

Expand All @@ -80,6 +95,9 @@ func getTraceAgentCfg(cfg TracesConfig, attributesTranslator *attributes.Transla
acfg.ComputeStatsBySpanKind = cfg.ComputeStatsBySpanKind
acfg.PeerTagsAggregation = cfg.PeerTagsAggregation
acfg.PeerTags = cfg.PeerTags
if cfg.EnableContainerStats {
acfg.Features["enable_cid_stats"] = struct{}{}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

songy23 marked this conversation as resolved.
Show resolved Hide resolved
}
if v := cfg.TraceBuffer; v > 0 {
acfg.TraceBuffer = v
}
Expand Down Expand Up @@ -112,6 +130,27 @@ func (c *traceToMetricConnector) Capabilities() consumer.Capabilities {
}

func (c *traceToMetricConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
if c.enableContainerStats && len(c.containerTagAttrs) > 0 {
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
attrs := rs.Resource().Attributes()
containerID, ok := attrs.Get(semconv.AttributeContainerID)
if !ok {
continue
}
ddContainerTags := attributes.ContainerTagsFromResourceAttributes(attrs)
for attr := range c.containerTagAttrs {
if val, ok := ddContainerTags[attr]; ok {
var cacheVal []string
if val, ok := c.containerTagCache.Get(containerID.AsString()); ok {
cacheVal = val.([]string)
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
}
cacheVal = append(cacheVal, val)
c.containerTagCache.Set(containerID.AsString(), cacheVal, cache.DefaultExpiration)
}
}
}
}
c.agent.Ingest(ctx, traces)
return nil
}
Expand All @@ -128,7 +167,20 @@ func (c *traceToMetricConnector) run() {
}
var mx pmetric.Metrics
var err error
// Enrich the stats with container tags
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
if c.enableContainerStats {
for _, stat := range stats.Stats {
if stat.ContainerID != "" {
if tags, ok := c.containerTagCache.Get(stat.ContainerID); ok {
tagList := tags.([]string)
stat.Tags = append(stat.Tags, tagList...)
}
}
}
}

c.logger.Debug("Received stats payload", zap.Any("stats", stats))

mx, err = c.translator.StatsToMetrics(stats)
if err != nil {
c.logger.Error("Failed to convert stats to metrics", zap.Error(err))
Expand Down
Loading