Skip to content

Commit

Permalink
[WIP][OTEL-1720] Add a new API to Concentrator that directly computes…
Browse files Browse the repository at this point in the history
… APM stats for OTLP traces
  • Loading branch information
songy23 committed Mar 27, 2024
1 parent eef61a7 commit b729613
Show file tree
Hide file tree
Showing 15 changed files with 2,100 additions and 28 deletions.
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2260,6 +2260,7 @@ core,go.opentelemetry.io/collector/receiver/receiverhelper,Apache-2.0,Copyright
core,go.opentelemetry.io/collector/receiver/receivertest,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.17.0,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.18.0,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.5.0,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.6.1,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/service,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/service/extensions,Apache-2.0,Copyright The OpenTelemetry Authors
Expand Down
3 changes: 2 additions & 1 deletion pkg/trace/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0
github.com/google/uuid v1.3.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/shirou/gopsutil/v3 v3.24.1
github.com/stretchr/testify v1.9.0
github.com/tinylib/msgp v1.1.8
Expand All @@ -36,6 +37,7 @@ require (
go.opentelemetry.io/collector/pdata v1.0.1
go.opentelemetry.io/collector/semconv v0.93.0
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/metric v1.22.0
go.uber.org/atomic v1.11.0
golang.org/x/sys v0.16.0
golang.org/x/time v0.3.0
Expand Down Expand Up @@ -97,7 +99,6 @@ require (
go.opentelemetry.io/collector/confmap v0.93.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.1 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.45.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/trace/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions pkg/trace/stats/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"strings"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-agent/pkg/trace/traceutil"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
Expand Down Expand Up @@ -105,6 +108,39 @@ func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregation
return agg, peerTags
}

// NewAggregationFromOTLPSpan creates a new aggregation from the provided OTLP span
func NewAggregationFromOTLPSpan(s ptrace.Span, res pcommon.Resource, lib pcommon.InstrumentationScope, conf *config.AgentConfig, aggKey PayloadAggregationKey, enablePeerTagsAgg bool, peerTagKeys []string) (Aggregation, []string) {
resName := GetOTelResource(s, res)
svc := GetOTelService(s, res, true)
opName := GetOTelOperationName(s, res, lib, conf, true)
spanType := GetOTelSpanType(s, res)
code := GetOTelStatusCode(s)
var isTraceRoot pb.TraceRootFlag
if s.ParentSpanID().IsEmpty() {
isTraceRoot = pb.TraceRootFlag_TRUE
} else {
isTraceRoot = pb.TraceRootFlag_FALSE
}
agg := Aggregation{
PayloadAggregationKey: aggKey,
BucketsAggregationKey: BucketsAggregationKey{
Resource: resName,
Service: svc,
Name: opName,
SpanKind: strings.ToLower(s.Kind().String()),
Type: spanType,
StatusCode: code,
IsTraceRoot: isTraceRoot,
},
}
var peerTags []string
if clientOrProducer(agg.SpanKind) && enablePeerTagsAgg {
peerTags = matchingOTelPeerTags(s, res, peerTagKeys)
agg.PeerTagsHash = peerTagsHash(peerTags)
}
return agg, peerTags
}

func matchingPeerTags(s *pb.Span, peerTagKeys []string) []string {
if len(peerTagKeys) == 0 {
return nil
Expand All @@ -118,6 +154,32 @@ func matchingPeerTags(s *pb.Span, peerTagKeys []string) []string {
return pt
}

// matchingOTelPeerTags returns a list of matched peer tags in OTel span and resource attributes.
// Peer tags are always normalized.
func matchingOTelPeerTags(span ptrace.Span, res pcommon.Resource, peerTagKeys []string) []string {
if len(peerTagKeys) == 0 {
return nil
}
var pts []string
for _, t := range peerTagKeys {
if v := GetOTelAttrValInResAndSpanAttrs(span, res, doNotNormalize, t); v != "" {
ps, err := traceutil.NormalizePeerService(v)
switch err {
case traceutil.ErrTooLong:
log.Debugf("Fixing malformed trace. peer.service is too long (reason:peer_service_truncate), truncating peer.service to length=%d: %s", traceutil.MaxServiceLen, ps)
case traceutil.ErrInvalid:
log.Debugf("Fixing malformed trace. peer.service is invalid (reason:peer_service_invalid), replacing invalid peer.service=%s with empty string", v)
default:
if err != nil {
log.Debugf("Unexpected error in peer.service normalization from original value (%s) to new value (%s): %s", v, ps, err)
}
}
pts = append(pts, t+":"+ps)
}
}
return pts
}

func peerTagsHash(tags []string) uint64 {
if len(tags) == 0 {
return 0
Expand Down
111 changes: 91 additions & 20 deletions pkg/trace/stats/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-agent/pkg/trace/traceutil"
"github.com/DataDog/datadog-agent/pkg/trace/version"
"github.com/DataDog/datadog-agent/pkg/trace/watchdog"

lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.17.0"

"github.com/DataDog/datadog-go/v5/statsd"
)

Expand Down Expand Up @@ -53,6 +58,7 @@ type Concentrator struct {
computeStatsBySpanKind bool // flag to enable computation of stats through checking the span.kind field
peerTagKeys []string // keys for supplementary tags that describe peer.service entities
statsd statsd.ClientInterface
containerTagsByID *lru.Cache[string, []string] // map from container id to container tags, used to fill out container tags for OTel stats
}

var defaultPeerTags = []string{
Expand Down Expand Up @@ -106,6 +112,7 @@ func preparePeerTags(tags ...string) []string {
// NewConcentrator initializes a new concentrator ready to be started
func NewConcentrator(conf *config.AgentConfig, out chan *pb.StatsPayload, now time.Time, statsd statsd.ClientInterface) *Concentrator {
bsize := conf.BucketInterval.Nanoseconds()
cache, _ := lru.New[string, []string](128)
c := Concentrator{
bsize: bsize,
buckets: make(map[int64]*RawBucket),
Expand All @@ -123,6 +130,7 @@ func NewConcentrator(conf *config.AgentConfig, out chan *pb.StatsPayload, now ti
peerTagsAggregation: conf.PeerServiceAggregation || conf.PeerTagsAggregation,
computeStatsBySpanKind: conf.ComputeStatsBySpanKind,
statsd: statsd,
containerTagsByID: cache,
}
// NOTE: maintain backwards-compatibility with old peer service flag that will eventually be deprecated.
if conf.PeerServiceAggregation || conf.PeerTagsAggregation {
Expand Down Expand Up @@ -174,8 +182,8 @@ func (c *Concentrator) Stop() {
}

// computeStatsForSpanKind returns true if the span.kind value makes the span eligible for stats computation.
func computeStatsForSpanKind(s *pb.Span) bool {
k := strings.ToLower(s.Meta["span.kind"])
func computeStatsForSpanKind(kind string) bool {
k := strings.ToLower(kind)
switch k {
case "server", "consumer", "client", "producer":
return true
Expand All @@ -196,17 +204,22 @@ func NewStatsInput(numChunks int, containerID string, clientComputedStats bool,
return Input{}
}
in := Input{Traces: make([]traceutil.ProcessedTrace, 0, numChunks)}
_, enabledCIDStats := conf.Features["enable_cid_stats"]
_, disabledCIDStats := conf.Features["disable_cid_stats"]
enableContainers := enabledCIDStats || (conf.FargateOrchestrator != config.OrchestratorUnknown)
if enableContainers && !disabledCIDStats {
// only allow the ContainerID stats dimension if we're in a Fargate instance or it's
// been explicitly enabled and it's not prohibited by the disable_cid_stats feature flag.
if shouldIncludeCIDDim(conf) {
in.ContainerID = containerID
}
return in
}

// shouldIncludeCIDDim checks if container ID should be added as a stats dimension
// Only allow the ContainerID stats dimension if we're in a Fargate instance or it's
// been explicitly enabled and it's not prohibited by the disable_cid_stats feature flag.
func shouldIncludeCIDDim(conf *config.AgentConfig) bool {
_, enabledCIDStats := conf.Features["enable_cid_stats"]
_, disabledCIDStats := conf.Features["disable_cid_stats"]
enableContainers := enabledCIDStats || (conf.FargateOrchestrator != config.OrchestratorUnknown)
return enableContainers && !disabledCIDStats
}

// Add applies the given input to the concentrator.
func (c *Concentrator) Add(t Input) {
c.mu.Lock()
Expand All @@ -216,6 +229,55 @@ func (c *Concentrator) Add(t Input) {
c.mu.Unlock()
}

// [BETA] ProcessOTLPTraces applies APM stats calculation on the otlp traces in the concentrator.

Check failure on line 232 in pkg/trace/stats/concentrator.go

View workflow job for this annotation

GitHub Actions / windows-lint

exported: comment on exported method Concentrator.ProcessOTLPTraces should be of the form "ProcessOTLPTraces ..." (revive)

Check failure on line 232 in pkg/trace/stats/concentrator.go

View workflow job for this annotation

GitHub Actions / windows-lint

exported: comment on exported method Concentrator.ProcessOTLPTraces should be of the form "ProcessOTLPTraces ..." (revive)
// This function is NOT called in Concentrator.Run(), so you need to manually call it to get APM stats on the given OTLP traces.
func (c *Concentrator) ProcessOTLPTraces(traces ptrace.Traces, conf *config.AgentConfig) {
c.mu.Lock()
defer c.mu.Unlock()
includeCID := shouldIncludeCIDDim(conf)
spanByID, resByID, scopeByID := IndexOTelSpans(traces)
topLevelByKind := !conf.HasFeature("disable_otlp_compute_top_level_by_span_kind")
topLevelSpans := GetTopLevelOTelSpans(spanByID, resByID, topLevelByKind)
ignoreResNames := make(map[string]struct{})
for _, resName := range conf.Ignore["resource"] {
ignoreResNames[resName] = struct{}{}
}
for spanID, span := range spanByID {
res := resByID[spanID]
resName := GetOTelResource(span, res)
if _, exists := ignoreResNames[resName]; exists {
continue
}
env := GetOTelAttrValInResAndSpanAttrs(span, res, doNormalize, semconv.AttributeDeploymentEnvironment)
if env == "" {
env = c.agentEnv
}
aggKey := PayloadAggregationKey{
Env: env,
Hostname: GetOTelHostname(span, res, conf),
Version: GetOTelAttrValInResAndSpanAttrs(span, res, doNormalize, semconv.AttributeServiceVersion),
}
if includeCID {
if cid := GetOTelAttrValInResAndSpanAttrs(span, res, doNormalize, semconv.AttributeContainerID, semconv.AttributeK8SPodUID); cid != "" {
aggKey.ContainerID = cid
if gitCommitSha, imageTag, err := version.GetVersionDataFromContainerTags(cid, conf); err == nil {
aggKey.GitCommitSha = gitCommitSha
aggKey.ImageTag = imageTag
}
c.containerTagsByID.Add(cid, GetOTelContainerTags(res.Attributes()))
}
}
_, isTop := topLevelSpans[spanID]
eligibleSpanKind := (topLevelByKind || c.computeStatsBySpanKind) && computeStatsForSpanKind(span.Kind().String())
if !(isTop || eligibleSpanKind) {
continue
}
end := int64(span.EndTimestamp().AsTime().Nanosecond())
b := c.getBucket(end)
b.HandleOTLPSpan(span, res, scopeByID[span.SpanID()], conf, isTop, aggKey, c.peerTagsAggregation, c.peerTagKeys)
}
}

// addNow adds the given input into the concentrator.
// Callers must guard!
func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string) {
Expand All @@ -238,28 +300,33 @@ func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string)
}
for _, s := range pt.TraceChunk.Spans {
isTop := traceutil.HasTopLevel(s)
eligibleSpanKind := c.computeStatsBySpanKind && computeStatsForSpanKind(s)
eligibleSpanKind := c.computeStatsBySpanKind && computeStatsForSpanKind(s.Meta["span.kind"])
if !(isTop || traceutil.IsMeasured(s) || eligibleSpanKind) {
continue
}
if traceutil.IsPartialSnapshot(s) {
continue
}
end := s.Start + s.Duration
btime := end - end%c.bsize
b := c.getBucket(end)
b.HandleSpan(s, weight, isTop, pt.TraceChunk.Origin, aggKey, c.peerTagsAggregation, c.peerTagKeys)
}
}

// If too far in the past, count in the oldest-allowed time bucket instead.
if btime < c.oldestTs {
btime = c.oldestTs
}
func (c *Concentrator) getBucket(end int64) *RawBucket {
btime := end - end%c.bsize

b, ok := c.buckets[btime]
if !ok {
b = NewRawBucket(uint64(btime), uint64(c.bsize))
c.buckets[btime] = b
}
b.HandleSpan(s, weight, isTop, pt.TraceChunk.Origin, aggKey, c.peerTagsAggregation, c.peerTagKeys)
// If too far in the past, count in the oldest-allowed time bucket instead.
if btime < c.oldestTs {
btime = c.oldestTs
}

b, ok := c.buckets[btime]
if !ok {
b = NewRawBucket(uint64(btime), uint64(c.bsize))
c.buckets[btime] = b
}
return b
}

// Flush deletes and returns complete statistic buckets.
Expand Down Expand Up @@ -309,8 +376,12 @@ func (c *Concentrator) flushNow(now int64, force bool) *pb.StatsPayload {
ImageTag: k.ImageTag,
Stats: s,
}
if containerTags, ok := c.containerTagsByID.Get(k.ContainerID); ok {
p.Tags = append(p.Tags, containerTags...)
}
sb = append(sb, p)
}

return &pb.StatsPayload{Stats: sb, AgentHostname: c.agentHostname, AgentEnv: c.agentEnv, AgentVersion: c.agentVersion}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/stats/concentrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ func TestComputeStatsForSpanKind(t *testing.T) {
false,
},
} {
assert.Equal(tc.res, computeStatsForSpanKind(tc.s))
assert.Equal(tc.res, computeStatsForSpanKind(tc.s.Meta["span.kind"]))
}
}

Expand Down
Loading

0 comments on commit b729613

Please sign in to comment.