Skip to content

Commit

Permalink
[OTEL-1720] Add a new API to Concentrator that directly computes APM …
Browse files Browse the repository at this point in the history
…stats for OTLP traces
  • Loading branch information
songy23 committed Apr 4, 2024
1 parent 30bd2c2 commit 3535ca1
Show file tree
Hide file tree
Showing 8 changed files with 547 additions and 43 deletions.
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,7 @@ core,go.opentelemetry.io/collector/receiver/receiverhelper,Apache-2.0,Copyright
core,go.opentelemetry.io/collector/receiver/receivertest,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.17.0,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.18.0,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.5.0,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/semconv/v1.6.1,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/service,Apache-2.0,Copyright The OpenTelemetry Authors
core,go.opentelemetry.io/collector/service/extensions,Apache-2.0,Copyright The OpenTelemetry Authors
Expand Down
75 changes: 69 additions & 6 deletions pkg/trace/stats/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"strings"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-agent/pkg/trace/traceutil"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
Expand Down Expand Up @@ -44,12 +47,13 @@ type BucketsAggregationKey struct {

// PayloadAggregationKey specifies the key by which a payload is aggregated.
type PayloadAggregationKey struct {
Env string
Hostname string
Version string
ContainerID string
GitCommitSha string
ImageTag string
Env string
Hostname string
Version string
ContainerID string
GitCommitSha string
ImageTag string
ContainerTags string
}

func getStatusCode(s *pb.Span) uint32 {
Expand Down Expand Up @@ -105,6 +109,39 @@ func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregation
return agg, peerTags
}

// NewAggregationFromOTLPSpan creates a new aggregation from the provided OTLP span
func NewAggregationFromOTLPSpan(s ptrace.Span, res pcommon.Resource, lib pcommon.InstrumentationScope, conf *config.AgentConfig, aggKey PayloadAggregationKey, enablePeerTagsAgg bool, peerTagKeys []string) (Aggregation, []string) {
resName := traceutil.GetOTelResource(s, res)
svc := traceutil.GetOTelService(s, res, true)
opName := traceutil.GetOTelOperationName(s, res, lib, conf.OTLPReceiver.SpanNameAsResourceName, conf.OTLPReceiver.SpanNameRemappings, true)
spanType := traceutil.GetOTelSpanType(s, res)
code := traceutil.GetOTelStatusCode(s)
var isTraceRoot pb.TraceRootFlag
if s.ParentSpanID().IsEmpty() {
isTraceRoot = pb.TraceRootFlag_TRUE
} else {
isTraceRoot = pb.TraceRootFlag_FALSE
}
agg := Aggregation{
PayloadAggregationKey: aggKey,
BucketsAggregationKey: BucketsAggregationKey{
Resource: resName,
Service: svc,
Name: opName,
SpanKind: strings.ToLower(s.Kind().String()),
Type: spanType,
StatusCode: code,
IsTraceRoot: isTraceRoot,
},
}
var peerTags []string
if clientOrProducer(agg.SpanKind) && enablePeerTagsAgg {
peerTags = matchingOTelPeerTags(s, res, peerTagKeys)
agg.PeerTagsHash = peerTagsHash(peerTags)
}
return agg, peerTags
}

func matchingPeerTags(s *pb.Span, peerTagKeys []string) []string {
if len(peerTagKeys) == 0 {
return nil
Expand All @@ -118,6 +155,32 @@ func matchingPeerTags(s *pb.Span, peerTagKeys []string) []string {
return pt
}

// matchingOTelPeerTags returns a list of matched peer tags in OTel span and resource attributes.
// Peer tags are always normalized.
func matchingOTelPeerTags(span ptrace.Span, res pcommon.Resource, peerTagKeys []string) []string {
if len(peerTagKeys) == 0 {
return nil
}
var pts []string
for _, t := range peerTagKeys {
if v := traceutil.GetOTelAttrValInResAndSpanAttrs(span, res, false, t); v != "" {
ps, err := traceutil.NormalizePeerService(v)
switch err {
case traceutil.ErrTooLong:
log.Debugf("Fixing malformed trace. peer.service is too long (reason:peer_service_truncate), truncating peer.service to length=%d: %s", traceutil.MaxServiceLen, ps)
case traceutil.ErrInvalid:
log.Debugf("Fixing malformed trace. peer.service is invalid (reason:peer_service_invalid), replacing invalid peer.service=%s with empty string", v)
default:
if err != nil {
log.Debugf("Unexpected error in peer.service normalization from original value (%s) to new value (%s): %s", v, ps, err)
}
}
pts = append(pts, t+":"+ps)
}
}
return pts
}

func peerTagsHash(tags []string) uint64 {
if len(tags) == 0 {
return 0
Expand Down
117 changes: 117 additions & 0 deletions pkg/trace/stats/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"testing"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestGetStatusCode(t *testing.T) {
Expand Down Expand Up @@ -157,6 +160,120 @@ func TestNewAggregation(t *testing.T) {
}
}

func TestNewAggregationFromSpan(t *testing.T) {
peerTags := []string{"db.instance", "db.system", "peer.service"}
peerSvcOnlyHash := uint64(3430395298086625290)
peerTagsHash := uint64(9894752672193411515)
for _, tt := range []struct {
name string
rattrs map[string]string
spanKind ptrace.SpanKind
enablePeerTagsAgg bool
resAgg Aggregation
resPeerTags []string
}{
{
name: "nil case, peer tag aggregation disabled",
enablePeerTagsAgg: false,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "otlpresourcenoservicename", SpanKind: "unspecified"}},
resPeerTags: nil,
},
{
name: "nil case, peer tag aggregation enabled",
enablePeerTagsAgg: true,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "otlpresourcenoservicename", SpanKind: "unspecified"}},
resPeerTags: nil,
},
{
name: "peer tag aggregation disabled even though peer.service is present",
rattrs: map[string]string{
"service.name": "a",
"peer.service": "remote-service",
},
spanKind: ptrace.SpanKindClient,
enablePeerTagsAgg: false,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client"}},
resPeerTags: nil,
},
{
name: "peer tags aggregation enabled, but span.kind != (client, producer)",
rattrs: map[string]string{
"service.name": "a",
"peer.service": "remote-service",
},
enablePeerTagsAgg: true,
spanKind: ptrace.SpanKindInternal,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "internal"}},
resPeerTags: nil,
},
{
name: "peer tags aggregation enabled, span.kind == client",
rattrs: map[string]string{
"service.name": "a",
"peer.service": "remote-service",
},
spanKind: ptrace.SpanKindClient,
enablePeerTagsAgg: true,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: peerSvcOnlyHash}},
resPeerTags: []string{"peer.service:remote-service"},
},
{
name: "peer tags aggregation enabled, span.kind == producer",
rattrs: map[string]string{
"service.name": "a",
"peer.service": "remote-service",
},
spanKind: ptrace.SpanKindProducer,
enablePeerTagsAgg: true,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "producer", PeerTagsHash: peerSvcOnlyHash}},
resPeerTags: []string{"peer.service:remote-service"},
},
{
name: "peer tags aggregation enabled and multiple peer tags match",
rattrs: map[string]string{
"service.name": "a",
"field1": "val1",
"peer.service": "remote-service",
"db.instance": "i-1234",
"db.system": "postgres",
},
spanKind: ptrace.SpanKindClient,
enablePeerTagsAgg: true,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: peerTagsHash}},
resPeerTags: []string{"db.instance:i-1234", "db.system:postgres", "peer.service:remote-service"},
},
{
name: "peer tags aggregation enabled but all peer tags are empty",
rattrs: map[string]string{"service.name": "a", "field1": "val1", "peer.service": "", "db.instance": "", "db.system": ""},
spanKind: ptrace.SpanKindClient,
enablePeerTagsAgg: true,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: 0}},
resPeerTags: nil,
},
{
name: "peer tags aggregation enabled but some peer tags are empty",
rattrs: map[string]string{"service.name": "a", "field1": "val1", "peer.service": "remote-service", "db.instance": "", "db.system": ""},
spanKind: ptrace.SpanKindClient,
enablePeerTagsAgg: true,
resAgg: Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: peerSvcOnlyHash}},
resPeerTags: []string{"peer.service:remote-service"},
},
} {
res := pcommon.NewResource()
for k, v := range tt.rattrs {
res.Attributes().PutStr(k, v)
}
span := ptrace.NewSpan()
span.SetKind(tt.spanKind)
conf := config.New()
agg, et := NewAggregationFromOTLPSpan(span, res, pcommon.NewInstrumentationScope(), conf, PayloadAggregationKey{}, tt.enablePeerTagsAgg, peerTags)
assert.Equal(t, tt.resAgg.Service, agg.Service, tt.name)
assert.Equal(t, tt.resAgg.SpanKind, agg.SpanKind, tt.name)
assert.Equal(t, tt.resAgg.PeerTagsHash, agg.PeerTagsHash, tt.name)
assert.Equal(t, tt.resPeerTags, et, tt.name)
}
}

func TestSpanKindIsConsumerOrProducer(t *testing.T) {
type testCase struct {
input string
Expand Down
Loading

0 comments on commit 3535ca1

Please sign in to comment.