Skip to content

Commit

Permalink
pkg/trace/api: OTLP: ensure attribute keys get standardised to Datado…
Browse files Browse the repository at this point in the history
…g tags

This change ensures that attribute keys get translated to Datadog
specific tags (such as container tags) or that "operation.name",
"service.name", "span.type" and "resource.name" correctly get translated
to span name, service, type or resource.
  • Loading branch information
gbbr committed Apr 4, 2022
1 parent af25c37 commit fe7f377
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 30 deletions.
148 changes: 129 additions & 19 deletions pkg/trace/api/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/sampler"
"github.com/DataDog/datadog-agent/pkg/trace/traceutil"

"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
Expand Down Expand Up @@ -253,8 +254,10 @@ func (o *OTLPReceiver) processRequest(protocol string, header http.Header, in ot
Spans: spans,
})
}
env := rattr[string(semconv.AttributeDeploymentEnvironment)]
p.TracerPayload = &pb.TracerPayload{
Chunks: traceChunks,
Env: traceutil.NormalizeTag(env),
ContainerID: fastHeaderGet(header, headerContainerID),
LanguageName: tagstats.Lang,
LanguageVersion: tagstats.LangVersion,
Expand Down Expand Up @@ -331,26 +334,17 @@ func marshalEvents(events pdata.SpanEventSlice) string {
// convertSpan converts the span in to a Datadog span, and uses the rattr resource tags and the lib instrumentation
// library attributes to further augment it.
func convertSpan(rattr map[string]string, lib pdata.InstrumentationLibrary, in pdata.Span) *pb.Span {
name := spanKindName(in.Kind())
if lib.Name() != "" {
name = lib.Name() + "." + name
} else {
name = "opentelemetry." + name
}
traceID := in.TraceID().Bytes()
meta := make(map[string]string, len(rattr))
for k, v := range rattr {
meta[k] = v
}
span := &pb.Span{
Name: name,
TraceID: traceIDToUint64(traceID),
SpanID: spanIDToUint64(in.SpanID().Bytes()),
ParentID: spanIDToUint64(in.ParentSpanID().Bytes()),
Start: int64(in.StartTimestamp()),
Duration: int64(in.EndTimestamp()) - int64(in.StartTimestamp()),
Service: rattr[string(semconv.AttributeServiceName)],
Resource: in.Name(),
Meta: meta,
Metrics: map[string]float64{},
}
Expand All @@ -363,39 +357,85 @@ func convertSpan(rattr map[string]string, lib pdata.InstrumentationLibrary, in p
if in.Events().Len() > 0 {
span.Meta["events"] = marshalEvents(in.Events())
}
var ctags strings.Builder // collect container tags from attributes
in.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
switch v.Type() {
case pdata.AttributeValueTypeDouble:
span.Metrics[k] = v.DoubleVal()
case pdata.AttributeValueTypeInt:
span.Metrics[k] = float64(v.IntVal())
default:
span.Meta[k] = v.AsString()
switch k {
case "operation.name":
span.Name = v.AsString()
case "service.name":
span.Service = v.AsString()
case "resource.name":
span.Resource = v.AsString()
case "span.type":
span.Type = v.AsString()
default:
span.Meta[k] = v.AsString()
}
}
if containerTagsAttributes[k] {
// attribute matching container tag found
if ctags.Len() > 0 {
ctags.WriteByte(',')
}
ctags.WriteString(conventionsMapping[k])
ctags.WriteByte(':')
ctags.WriteString(v.AsString())
}
return true
})
if ctags.Len() > 0 {
span.Meta[tagContainersTags] = ctags.String()
}
if _, ok := span.Meta["env"]; !ok {
if env := span.Meta[string(semconv.AttributeDeploymentEnvironment)]; env != "" {
span.Meta["env"] = env
span.Meta["env"] = traceutil.NormalizeTag(env)
}
}
if in.TraceState() != pdata.TraceStateEmpty {
span.Meta["trace_state"] = string(in.TraceState())
}
if lib.Name() != "" {
span.Meta["instrumentation_library.name"] = lib.Name()
span.Meta["otel.library.name"] = lib.Name()
}
if lib.Version() != "" {
span.Meta["instrumentation_library.version"] = lib.Version()
span.Meta["otel.library.version"] = lib.Version()
}
if svc := span.Meta[string(semconv.AttributePeerService)]; svc != "" {
span.Service = svc
span.Meta["otel.status_code"] = in.Status().Code().String()
status2Error(in.Status(), in.Events(), span)
if span.Name == "" {
name := spanKindName(in.Kind())
if lib.Name() != "" {
name = lib.Name() + "." + name
} else {
name = "opentelemetry." + name
}
span.Name = name
}
if r := resourceFromTags(span.Meta); r != "" {
span.Resource = r
if span.Service == "" {
if svc := span.Meta[string(semconv.AttributePeerService)]; svc != "" {
span.Service = svc
} else if svc := rattr[string(semconv.AttributeServiceName)]; svc != "" {
span.Service = svc
} else {
span.Service = "OTLPResourceNoServiceName"
}
}
if span.Resource == "" {
if r := resourceFromTags(span.Meta); r != "" {
span.Resource = r
} else {
span.Resource = in.Name()
}
}
if span.Type == "" {
span.Type = spanKind2Type(in.Kind(), span)
}
span.Type = spanKind2Type(in.Kind(), span)
status2Error(in.Status(), in.Events(), span)
return span
}

Expand Down Expand Up @@ -499,3 +539,73 @@ func spanKindName(k pdata.SpanKind) string {
}
return name
}

// conventionsMappings defines the mapping between OpenTelemetry semantic conventions
// and Datadog Agent conventions
var conventionsMapping = map[string]string{
// Datadog conventions
// https://docs.datadoghq.com/getting_started/tagging/unified_service_tagging/
semconv.AttributeDeploymentEnvironment: "env",
semconv.AttributeServiceName: "service",
semconv.AttributeServiceVersion: "version",

// Containers
semconv.AttributeContainerID: "container_id",
semconv.AttributeContainerName: "container_name",
semconv.AttributeContainerImageName: "image_name",
semconv.AttributeContainerImageTag: "image_tag",

// Cloud conventions
// https://www.datadoghq.com/blog/tagging-best-practices/
semconv.AttributeCloudProvider: "cloud_provider",
semconv.AttributeCloudRegion: "region",
semconv.AttributeCloudAvailabilityZone: "zone",

// ECS conventions
// https://github.com/DataDog/datadog-agent/blob/e081bed/pkg/tagger/collectors/ecs_extract.go
semconv.AttributeAWSECSTaskFamily: "task_family",
semconv.AttributeAWSECSTaskARN: "task_arn",
semconv.AttributeAWSECSClusterARN: "ecs_cluster_name",
semconv.AttributeAWSECSTaskRevision: "task_version",
semconv.AttributeAWSECSContainerARN: "ecs_container_name",

// Kubernetes resource name (via semantic conventions)
// https://github.com/DataDog/datadog-agent/blob/e081bed/pkg/util/kubernetes/const.go
semconv.AttributeK8SContainerName: "kube_container_name",
semconv.AttributeK8SClusterName: "kube_cluster_name",
semconv.AttributeK8SDeploymentName: "kube_deployment",
semconv.AttributeK8SReplicaSetName: "kube_replica_set",
semconv.AttributeK8SStatefulSetName: "kube_stateful_set",
semconv.AttributeK8SDaemonSetName: "kube_daemon_set",
semconv.AttributeK8SJobName: "kube_job",
semconv.AttributeK8SCronJobName: "kube_cronjob",
semconv.AttributeK8SNamespaceName: "kube_namespace",
semconv.AttributeK8SPodName: "pod_name",
}

// containerTagsAttributes lists attribute names that can be converted to Datadog tags
// using the conventionsMapping map.
var containerTagsAttributes = map[string]bool{
semconv.AttributeContainerID: true,
semconv.AttributeContainerName: true,
semconv.AttributeContainerImageName: true,
semconv.AttributeContainerImageTag: true,
semconv.AttributeK8SContainerName: true,
semconv.AttributeK8SClusterName: true,
semconv.AttributeK8SDeploymentName: true,
semconv.AttributeK8SReplicaSetName: true,
semconv.AttributeK8SStatefulSetName: true,
semconv.AttributeK8SDaemonSetName: true,
semconv.AttributeK8SJobName: true,
semconv.AttributeK8SCronJobName: true,
semconv.AttributeK8SNamespaceName: true,
semconv.AttributeK8SPodName: true,
semconv.AttributeCloudProvider: true,
semconv.AttributeCloudRegion: true,
semconv.AttributeCloudAvailabilityZone: true,
semconv.AttributeAWSECSTaskFamily: true,
semconv.AttributeAWSECSTaskARN: true,
semconv.AttributeAWSECSClusterARN: true,
semconv.AttributeAWSECSTaskRevision: true,
semconv.AttributeAWSECSContainerARN: true,
}
84 changes: 73 additions & 11 deletions pkg/trace/api/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
semconv "go.opentelemetry.io/collector/model/semconv/v1.6.1"
)

var otlpTestSpanConfig = &testutil.OTLPSpan{
Expand Down Expand Up @@ -590,6 +591,57 @@ func TestOTLPConvertSpan(t *testing.T) {
},
Type: "web",
},
}, {
rattr: map[string]string{
"env": "staging",
},
libname: "ddtracer",
libver: "v2",
in: testutil.NewOTLPSpan(&testutil.OTLPSpan{
Name: "/path",
Start: now,
End: now + 200000000,
Attributes: map[string]interface{}{
"service.name": "mongo",
"operation.name": "READ",
"resource.name": "/path",
"span.type": "db",
"name": "john",
semconv.AttributeContainerID: "cid",
semconv.AttributeK8SContainerName: "k8s-container",
"http.method": "GET",
"http.route": "/path",
"approx": 1.2,
"count": 2,
},
}),
out: &pb.Span{
Service: "mongo",
Name: "READ",
Resource: "/path",
TraceID: 2594128270069917171,
SpanID: 2594128270069917171,
ParentID: 0,
Start: int64(now),
Duration: 200000000,
Meta: map[string]string{
"env": "staging",
"_dd.tags.container": "container_id:cid,kube_container_name:k8s-container",
semconv.AttributeContainerID: "cid",
semconv.AttributeK8SContainerName: "k8s-container",
"http.method": "GET",
"http.route": "/path",
"instrumentation_library.name": "ddtracer",
"instrumentation_library.version": "v2",
"name": "john",
"otel.trace_id": "72df520af2bde7a5240031ead750e5f3",
},
Metrics: map[string]float64{
"approx": 1.2,
"count": 2,
},
Type: "db",
},
},
} {
lib := pdata.NewInstrumentationLibrary()
Expand All @@ -599,21 +651,31 @@ func TestOTLPConvertSpan(t *testing.T) {
want := tt.out
got := convertSpan(tt.rattr, lib, tt.in)
if len(want.Meta) != len(got.Meta) {
t.Fatalf("(%d) Meta count mismatch", i)
t.Fatalf("(%d) Meta count mismatch:\n%#v", i, got.Meta)
}
for k, v := range want.Meta {
if k != "events" {
switch k {
case "events":
// events contain maps with no guaranteed order of
// traversal; best to unpack to compare
var gote, wante []testutil.OTLPSpanEvent
if err := json.Unmarshal([]byte(v), &wante); err != nil {
t.Fatalf("(%d) Error unmarshalling: %v", i, err)
}
if err := json.Unmarshal([]byte(got.Meta[k]), &gote); err != nil {
t.Fatalf("(%d) Error unmarshalling: %v", i, err)
}
assert.Equal(wante, gote)
case "_dd.container_tags":
// order not guaranteed, so we need to unpack and sort to compare
gott := strings.Split(got.Meta[tagContainersTags], ",")
wantt := strings.Split(want.Meta[tagContainersTags], ",")
sort.Strings(gott)
sort.Strings(wantt)
assert.Equal(wantt, gott)
default:
assert.Equal(v, got.Meta[k], fmt.Sprintf("(%d) Meta %v:%v", i, k, v))
continue
}
var gote, wante []testutil.OTLPSpanEvent
if err := json.Unmarshal([]byte(v), &wante); err != nil {
t.Fatalf("(%d) Error unmarshalling: %v", i, err)
}
if err := json.Unmarshal([]byte(got.Meta[k]), &gote); err != nil {
t.Fatalf("(%d) Error unmarshalling: %v", i, err)
}
assert.Equal(wante, gote)
}
if len(want.Metrics) != len(got.Metrics) {
t.Fatalf("(%d) Metrics count mismatch:\n\n%v\n\n%v", i, want.Metrics, got.Metrics)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
fixes:
- |
APM: OTLP: this change ensures that the ingest now standardizes certain attribute keys to their correct Datadog tag counter parts, such as: container tags, "operation.name", "service.name", etc.

0 comments on commit fe7f377

Please sign in to comment.