Skip to content

Commit

Permalink
[processor/datadog]: add datadogprocessor (#16853)
Browse files Browse the repository at this point in the history
* processor: add datadogprocessor

This change adds the processor described in #15689. It is the initial PR
containing the structure and implementation.

* Address PR comments

* go mod tidy

* Ensure Shutdown can be called even if Start fails

* Use component.ID

* Address PR comments

* Update linting errors

* make generate-gh-issue-templates
  • Loading branch information
gbbr authored Dec 13, 2022
1 parent 644396f commit a0e93c0
Show file tree
Hide file tree
Showing 16 changed files with 1,592 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pkg/experimentalmetricmetadata/ @open-telemetry/collector-c

processor/attributesprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken
processor/cumulativetodeltaprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth
processor/datadogprocessor/ @open-telemetry/collector-contrib-approvers @mx-psi @gbbr @dineshg13
processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ body:
- pkg/winperfcounters
- processor/attributes
- processor/cumulativetodelta
- processor/datadog
- processor/deltatorate
- processor/filter
- processor/groupbyattrs
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ body:
- pkg/winperfcounters
- processor/attributes
- processor/cumulativetodelta
- processor/datadog
- processor/deltatorate
- processor/filter
- processor/groupbyattrs
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ body:
- pkg/winperfcounters
- processor/attributes
- processor/cumulativetodelta
- processor/datadog
- processor/deltatorate
- processor/filter
- processor/groupbyattrs
Expand Down
1 change: 1 addition & 0 deletions processor/datadogprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
32 changes: 32 additions & 0 deletions processor/datadogprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Datadog Processor

| Status | |
|--------------------------|---------------|
| Stability | [development] |
| Supported pipeline types | traces |
| Distributions | [contrib] |

## Description

The Datadog Processor can be used to compute Datadog APM Stats pre-sampling. For example, when using the [tailsamplingprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/tailsamplingprocessor#tail-sampling-processor) or [probabilisticsamplerprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/probabilisticsamplerprocessor) components, the `datadogprocessor` can be prepended into the pipeline to ensure that Datadog APM Stats are accurate and include the dropped traces.

## Configuration

By default, when used in conjunction with the Datadog Exporter, the processor should detect its presence (as long as it is configured within a pipeline), and use it to export the Datadog APM Stats.

If using within a gateway deployment or alongside the Datadog Agent, where the Datadog Exporter is not present, and, for example, an OTLP exporter might be, you need to specify the metrics exporter to the processor:

```yaml
processors:
datadog:
metrics_exporter: otlp
```
The default value for `metrics_exporter` is `datadog`. Any configured metrics exporter must exist as part of a metrics pipeline.

When using in conjunction with the Datadog Agent's OTLP Ingest, the minimum required Datadog Agent version that supports this processor is 7.42.0.

If not using the Datadog backend, the processor will still create valid RED metrics, but in that situation you may prefer to use the [spanmetricsprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/spanmetricsprocessor) instead.

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
184 changes: 184 additions & 0 deletions processor/datadogprocessor/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"

import (
"context"
"net/http"
"runtime"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/otlp/model/translator"
"github.com/DataDog/datadog-agent/pkg/trace/agent"
"github.com/DataDog/datadog-agent/pkg/trace/api"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/stats"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// traceagent specifies a minimal trace agent instance that is able to process traces and output stats.
type traceagent struct {
*agent.Agent

// pchan specifies the channel that will be used to output Datadog Trace Agent API Payloads
// resulting from ingested OpenTelemetry spans.
pchan chan *api.Payload

// wg waits for all goroutines to exit.
wg sync.WaitGroup

// exit signals the agent to shut down.
exit chan struct{}
}

// newAgent creates a new unstarted traceagent using the given context. Call Start to start the traceagent.
// The out channel will receive outoing stats payloads resulting from spans ingested using the Ingest method.
func newAgent(ctx context.Context, out chan pb.StatsPayload) *traceagent {
return newAgentWithConfig(ctx, traceconfig.New(), out)
}

// newAgentWithConfig creates a new traceagent with the given config cfg. Used in tests; use newAgent instead.
func newAgentWithConfig(ctx context.Context, cfg *traceconfig.AgentConfig, out chan pb.StatsPayload) *traceagent {
// disable the HTTP receiver
cfg.ReceiverPort = 0
// set the API key to succeed startup; it is never used nor needed
cfg.Endpoints[0].APIKey = "skip_check"
// set the default hostname to the translator's placeholder; in the case where no hostname
// can be deduced from incoming traces, we don't know the default hostname (because it is set
// in the exporter). In order to avoid duplicating the hostname setting in the processor and
// exporter, we use a placeholder and fill it in later (in the Datadog Exporter or Agent OTLP
// Ingest). This gives a better user experience.
cfg.Hostname = translator.UnsetHostnamePlaceholder
pchan := make(chan *api.Payload, 1000)
a := agent.NewAgent(ctx, cfg)
// replace the Concentrator (the component which computes and flushes APM Stats from incoming
// traces) with our own, which uses the 'out' channel.
a.Concentrator = stats.NewConcentrator(cfg, out, time.Now())
// ...and the same for the ClientStatsAggregator; we don't use it here, but it is also a source
// of stats which should be available to us.
a.ClientStatsAggregator = stats.NewClientStatsAggregator(cfg, out)
// lastly, start the OTLP receiver, which will be used to introduce ResourceSpans into the traceagent,
// so that we can transform them to Datadog spans and receive stats.
a.OTLPReceiver = api.NewOTLPReceiver(pchan, cfg)
return &traceagent{
Agent: a,
exit: make(chan struct{}),
pchan: pchan,
}
}

// Start starts the traceagent, making it ready to ingest spans.
func (p *traceagent) Start() {
// we don't need to start the full agent, so we only start a set of minimal
// components needed to compute stats:
for _, starter := range []interface{ Start() }{
p.Concentrator,
p.ClientStatsAggregator,
// we don't need the samplers' nor the processor's functionalities;
// but they are used by the agent nevertheless, so they need to be
// active and functioning.
p.PrioritySampler,
p.ErrorsSampler,
p.NoPrioritySampler,
p.EventProcessor,
} {
starter.Start()
}

p.goDrain()
p.goProcess()
}

// Stop stops the traceagent, making it unable to ingest spans. Do not call Ingest after Stop.
func (p *traceagent) Stop() {
for _, stopper := range []interface{ Stop() }{
p.Concentrator,
p.ClientStatsAggregator,
p.PrioritySampler,
p.ErrorsSampler,
p.NoPrioritySampler,
p.EventProcessor,
} {
stopper.Stop()
}
close(p.exit)
p.wg.Wait()
}

// goDrain drains the TraceWriter channel, ensuring it won't block. We don't need the traces,
// nor do we have a running TraceWrite. We just want the outgoing stats.
func (p *traceagent) goDrain() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-p.TraceWriter.In:
// we don't write these traces anywhere; drain the channel
case <-p.exit:
return
}
}
}()
}

// Ingest processes the given spans within the traceagent and outputs stats through the output channel
// provided to newAgent. Do not call Ingest on an unstarted or stopped traceagent.
func (p *traceagent) Ingest(ctx context.Context, traces ptrace.Traces) {
rspanss := traces.ResourceSpans()
for i := 0; i < rspanss.Len(); i++ {
rspans := rspanss.At(i)
p.OTLPReceiver.ReceiveResourceSpans(ctx, rspans, http.Header{}, "datadogprocessor")
// ...the call transforms the OTLP Spans into a Datadog payload and sends the result
// down the p.pchan channel
}
}

// goProcesses runs the main loop which takes incoming payloads, processes them and generates stats.
// It then picks up those stats and converts them to metrics.
func (p *traceagent) goProcess() {
for i := 0; i < runtime.NumCPU(); i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case payload := <-p.pchan:
p.Process(payload)
// ...the call processes the payload and outputs stats via the 'out' channel
// provided to newAgent
case <-p.exit:
return
}
}
}()
}
}

var _ ingester = (*traceagent)(nil)

// An ingester is able to ingest traces. Implemented by traceagent.
type ingester interface {
// Start starts the ingester.
Start()

// Ingest ingests the set of traces.
Ingest(ctx context.Context, traces ptrace.Traces)

// Stop stops the ingester.
Stop()
}
92 changes: 92 additions & 0 deletions processor/datadogprocessor/agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"

import (
"context"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/otlp/model/translator"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/testutil"
"github.com/stretchr/testify/require"
)

func TestTraceAgentConfig(t *testing.T) {
cfg := traceconfig.New()
require.NotZero(t, cfg.ReceiverPort)

out := make(chan pb.StatsPayload)
agnt := newAgentWithConfig(context.Background(), cfg, out)
require.Zero(t, cfg.ReceiverPort)
require.NotEmpty(t, cfg.Endpoints[0].APIKey)
require.Equal(t, translator.UnsetHostnamePlaceholder, cfg.Hostname)
require.Equal(t, out, agnt.Concentrator.Out)
}

func TestTraceAgent(t *testing.T) {
cfg := traceconfig.New()
cfg.BucketInterval = 50 * time.Millisecond
out := make(chan pb.StatsPayload, 10)
ctx := context.Background()
a := newAgentWithConfig(ctx, cfg, out)
a.Start()
defer a.Stop()

rspanss := testutil.NewOTLPTracesRequest([]testutil.OTLPResourceSpan{
{
LibName: "libname",
LibVersion: "1.2",
Attributes: map[string]interface{}{},
Spans: []*testutil.OTLPSpan{
{Name: "1"},
{Name: "2"},
{Name: "3"},
},
},
{
LibName: "other-libname",
LibVersion: "2.1",
Attributes: map[string]interface{}{},
Spans: []*testutil.OTLPSpan{
{Name: "4", TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}},
{Name: "5", TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}},
},
},
}).Traces()

a.Ingest(ctx, rspanss)
var stats pb.StatsPayload
timeout := time.After(500 * time.Millisecond)
loop:
for {
select {
case stats = <-out:
if len(stats.Stats) != 0 {
break loop
}
case <-timeout:
t.Fatal("timed out")
}
}
require.Len(t, stats.Stats, 1)
require.Len(t, stats.Stats[0].Stats, 1)
// considering all spans in rspans have distinct aggregations, we should have an equal amount
// of groups
require.Len(t, stats.Stats[0].Stats[0].Stats, rspanss.SpanCount())
require.Len(t, a.TraceWriter.In, 0) // the trace writer channel should've been drained
}
35 changes: 35 additions & 0 deletions processor/datadogprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
)

// Config defines the configuration options for datadogprocessor.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// MetricsExporter specifies the name of the metrics exporter to be used when
// exporting stats metrics.
MetricsExporter component.ID `mapstructure:"metrics_exporter"`
}

func createDefaultConfig() component.Config {
return &Config{
MetricsExporter: component.NewID(component.Type("datadog")),
}
}
Loading

0 comments on commit a0e93c0

Please sign in to comment.