Skip to content

Commit

Permalink
[connector/datadog] Use the native OTel ingest API in APM stats (open…
Browse files Browse the repository at this point in the history
…-telemetry#33297)

**Description:**
Add a feature gate `connector.datadogconnector.NativeIngest` that
enables datadog connector to use the new native OTel API in APM stats
computation. It is disabled by default.

Follow-up of DataDog/datadog-agent#23503.

---------

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
songy23 and mx-psi authored Jun 26, 2024
1 parent fc97472 commit 4623e7e
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 6 deletions.
27 changes: 27 additions & 0 deletions .chloggen/datadog-connector-native-ingest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add a feature gate `connector.datadogconnector.NativeIngest` that enables datadog connector to use the new native OTel API in APM stats computation."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33297]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: "The feature gate `connector.datadogconnector.NativeIngest` is disabled by default."

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
154 changes: 154 additions & 0 deletions connector/datadogconnector/connector_native.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"

import (
"context"
"fmt"
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/stats"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
)

// traceToMetricConnectorNative is the schema for connector
type traceToMetricConnectorNative struct {
metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector
logger *zap.Logger

// concentrator ingests spans and produces APM stats
concentrator *stats.Concentrator

// tcfg is the trace agent config
tcfg *config.AgentConfig

// ctagKeys are container tag keys
ctagKeys []string

// translator specifies the translator used to transform APM Stats Payloads
// from the agent to OTLP Metrics.
translator *metrics.Translator

// statsout specifies the channel through which the agent will output Stats Payloads
// resulting from ingested traces.
statsout chan *pb.StatsPayload

// exit specifies the exit channel, which will be closed upon shutdown.
exit chan struct{}

// isStarted tracks whether Start() has been called.
isStarted bool
}

var _ component.Component = (*traceToMetricConnectorNative)(nil) // testing that the connectorImp properly implements the type Component interface

// newTraceToMetricConnectorNative creates a new connector with native OTel span ingestion
func newTraceToMetricConnectorNative(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics, metricsClient statsd.ClientInterface) (*traceToMetricConnectorNative, error) {
set.Logger.Info("Building datadog connector for traces to metrics")
statsout := make(chan *pb.StatsPayload, 100)
set.MeterProvider = noop.NewMeterProvider() // disable metrics for the connector
attributesTranslator, err := attributes.NewTranslator(set)
if err != nil {
return nil, fmt.Errorf("failed to create attributes translator: %w", err)
}
trans, err := metrics.NewTranslator(set, attributesTranslator)
if err != nil {
return nil, fmt.Errorf("failed to create metrics translator: %w", err)
}

tcfg := getTraceAgentCfg(set.Logger, cfg.(*Config).Traces, attributesTranslator)
return &traceToMetricConnectorNative{
logger: set.Logger,
translator: trans,
tcfg: tcfg,
ctagKeys: cfg.(*Config).Traces.ResourceAttributesAsContainerTags,
concentrator: stats.NewConcentrator(tcfg, statsout, time.Now(), metricsClient),
statsout: statsout,
metricsConsumer: metricsConsumer,
exit: make(chan struct{}),
}, nil
}

// Start implements the component.Component interface.
func (c *traceToMetricConnectorNative) Start(_ context.Context, _ component.Host) error {
c.logger.Info("Starting datadogconnector")
c.concentrator.Start()
go c.run()
c.isStarted = true
return nil
}

// Shutdown implements the component.Component interface.
func (c *traceToMetricConnectorNative) Shutdown(context.Context) error {
if !c.isStarted {
// Note: it is not necessary to manually close c.exit, c.in and c.concentrator.exit channels as these are unused.
c.logger.Info("Requested shutdown, but not started, ignoring.")
return nil
}
c.logger.Info("Shutting down datadog connector")
c.logger.Info("Stopping concentrator")
// stop the concentrator and wait for the run loop to exit
c.concentrator.Stop()
c.exit <- struct{}{} // signal exit
<-c.exit // wait for close
return nil
}

// Capabilities implements the consumer interface.
// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data
func (c *traceToMetricConnectorNative) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (c *traceToMetricConnectorNative) ConsumeTraces(_ context.Context, traces ptrace.Traces) error {
inputs := stats.OTLPTracesToConcentratorInputs(traces, c.tcfg, c.ctagKeys)
for _, input := range inputs {
c.concentrator.Add(input)
}
return nil
}

// run awaits incoming stats resulting from the agent's ingestion, converts them
// to metrics and flushes them using the configured metrics exporter.
func (c *traceToMetricConnectorNative) run() {
defer close(c.exit)
for {
select {
case stats := <-c.statsout:
if len(stats.Stats) == 0 {
continue
}
var mx pmetric.Metrics
var err error

c.logger.Debug("Received stats payload", zap.Any("stats", stats))

mx, err = c.translator.StatsToMetrics(stats)
if err != nil {
c.logger.Error("Failed to convert stats to metrics", zap.Error(err))
continue
}
// APM stats as metrics
ctx := context.TODO()

// send metrics to the consumer or next component in pipeline
if err := c.metricsConsumer.ConsumeMetrics(ctx, mx); err != nil {
c.logger.Error("Failed ConsumeMetrics", zap.Error(err))
return
}
case <-c.exit:
return
}
}
}
133 changes: 133 additions & 0 deletions connector/datadogconnector/connector_native_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogconnector

import (
"context"
"testing"
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/featuregate"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

var _ component.Component = (*traceToMetricConnectorNative)(nil) // testing that the connectorImp properly implements the type Component interface

// create test to create a connector, check that basic code compiles
func TestNewConnectorNative(t *testing.T) {
err := featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), true)
assert.NoError(t, err)
defer func() {
_ = featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), false)
}()

factory := NewFactory()

creationParams := connectortest.NewNopSettings()
cfg := factory.CreateDefaultConfig().(*Config)

tconn, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop())
assert.NoError(t, err)

_, ok := tconn.(*traceToMetricConnectorNative)
assert.True(t, ok) // checks if the created connector implements the connectorImp struct
}

func TestTraceToTraceConnectorNative(t *testing.T) {
err := featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), true)
assert.NoError(t, err)
defer func() {
_ = featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), false)
}()

factory := NewFactory()

creationParams := connectortest.NewNopSettings()
cfg := factory.CreateDefaultConfig().(*Config)

tconn, err := factory.CreateTracesToTraces(context.Background(), creationParams, cfg, consumertest.NewNop())
assert.NoError(t, err)

_, ok := tconn.(*traceToTraceConnector)
assert.True(t, ok) // checks if the created connector implements the connectorImp struct
}

func creteConnectorNative(t *testing.T) (*traceToMetricConnectorNative, *consumertest.MetricsSink) {
err := featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), true)
assert.NoError(t, err)
defer func() {
_ = featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), false)
}()

factory := NewFactory()

creationParams := connectortest.NewNopSettings()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Traces.ResourceAttributesAsContainerTags = []string{semconv.AttributeCloudAvailabilityZone, semconv.AttributeCloudRegion, "az"}

metricsSink := &consumertest.MetricsSink{}

tconn, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, metricsSink)
assert.NoError(t, err)

connector, ok := tconn.(*traceToMetricConnectorNative)
require.True(t, ok)
return connector, metricsSink
}

func TestContainerTagsNative(t *testing.T) {
connector, metricsSink := creteConnectorNative(t)
err := connector.Start(context.Background(), componenttest.NewNopHost())
if err != nil {
t.Errorf("Error starting connector: %v", err)
return
}
defer func() {
_ = connector.Shutdown(context.Background())
}()

trace1 := generateTrace()

err = connector.ConsumeTraces(context.Background(), trace1)
assert.NoError(t, err)

// Send two traces to ensure unique container tags are added to the cache
trace2 := generateTrace()
err = connector.ConsumeTraces(context.Background(), trace2)
assert.NoError(t, err)

for {
if len(metricsSink.AllMetrics()) > 0 {
break
}
time.Sleep(100 * time.Millisecond)
}

// check if the container tags are added to the metrics
metrics := metricsSink.AllMetrics()
assert.Equal(t, 1, len(metrics))

ch := make(chan []byte, 100)
tr := newTranslatorWithStatsChannel(t, zap.NewNop(), ch)
_, err = tr.MapMetrics(context.Background(), metrics[0], nil)
require.NoError(t, err)
msg := <-ch
sp := &pb.StatsPayload{}

err = proto.Unmarshal(msg, sp)
require.NoError(t, err)

tags := sp.Stats[0].Tags
assert.Equal(t, 3, len(tags))
assert.ElementsMatch(t, []string{"region:my-region", "zone:my-zone", "az:my-az"}, tags)
}
23 changes: 20 additions & 3 deletions connector/datadogconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,22 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector/internal/metadata"
)

const nativeIngestFeatureGateName = "connector.datadogconnector.NativeIngest"

// NativeIngestFeatureGate is the feature gate that controls native OTel spans ingestion in Datadog APM stats
var NativeIngestFeatureGate = featuregate.GlobalRegistry().MustRegister(
nativeIngestFeatureGateName,
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, datadogconnector uses the native OTel API to ingest OTel spans and produce APM stats."),
featuregate.WithRegisterFromVersion("v0.104.0"),
)

// NewFactory creates a factory for tailtracer connector.
func NewFactory() connector.Factory {
// OTel connector factory to make a factory for connectors
Expand All @@ -38,10 +50,15 @@ func createDefaultConfig() component.Config {

// defines the consumer type of the connector
// we want to consume traces and export metrics therefore define nextConsumer as metrics, consumer is the next component in the pipeline
func createTracesToMetricsConnector(_ context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
func createTracesToMetricsConnector(_ context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (c connector.Traces, err error) {
metricsClient := metricsclient.InitializeMetricClient(params.MeterProvider, metricsclient.ConnectorSourceTag)
timingReporter := timing.New(metricsClient)
c, err := newTraceToMetricConnector(params.TelemetrySettings, cfg, nextConsumer, metricsClient, timingReporter)
if NativeIngestFeatureGate.IsEnabled() {
params.Logger.Info("Datadog connector using the native OTel API to ingest OTel spans and produce APM stats")
c, err = newTraceToMetricConnectorNative(params.TelemetrySettings, cfg, nextConsumer, metricsClient)
} else {
params.Logger.Info("Datadog connector using the old processing pipelines to ingest OTel spans and produce APM stats. To opt in the new native OTel APM stats API, enable the feature gate", zap.String("feature gate", nativeIngestFeatureGateName))
c, err = newTraceToMetricConnector(params.TelemetrySettings, cfg, nextConsumer, metricsClient, timing.New(metricsClient))
}
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion connector/datadogconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
go.opentelemetry.io/collector/consumer v0.103.0
go.opentelemetry.io/collector/exporter v0.103.0
go.opentelemetry.io/collector/exporter/debugexporter v0.103.0
go.opentelemetry.io/collector/featuregate v1.10.0
go.opentelemetry.io/collector/otelcol v0.103.0
go.opentelemetry.io/collector/pdata v1.10.0
go.opentelemetry.io/collector/processor v0.103.0
Expand Down Expand Up @@ -226,7 +227,6 @@ require (
go.opentelemetry.io/collector/confmap/provider/yamlprovider v0.103.0 // indirect
go.opentelemetry.io/collector/extension v0.103.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.103.0 // indirect
go.opentelemetry.io/collector/featuregate v1.10.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.103.0 // indirect
go.opentelemetry.io/collector/service v0.103.0 // indirect
go.opentelemetry.io/contrib/config v0.7.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion exporter/datadogexporter/integrationtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.opentelemetry.io/collector/connector v0.103.0
go.opentelemetry.io/collector/exporter v0.103.0
go.opentelemetry.io/collector/exporter/debugexporter v0.103.0
go.opentelemetry.io/collector/featuregate v1.10.0
go.opentelemetry.io/collector/otelcol v0.103.0
go.opentelemetry.io/collector/processor v0.103.0
go.opentelemetry.io/collector/processor/batchprocessor v0.103.0
Expand Down Expand Up @@ -225,7 +226,6 @@ require (
go.opentelemetry.io/collector/consumer v0.103.0 // indirect
go.opentelemetry.io/collector/extension v0.103.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.103.0 // indirect
go.opentelemetry.io/collector/featuregate v1.10.0 // indirect
go.opentelemetry.io/collector/pdata v1.10.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.103.0 // indirect
go.opentelemetry.io/collector/semconv v0.103.0 // indirect
Expand Down
Loading

0 comments on commit 4623e7e

Please sign in to comment.