Skip to content

Commit

Permalink
processor/otel: record array resource attributes (#5704)
Browse files Browse the repository at this point in the history
* processor/otel: record array resource attributes

If an unhandled array attribute is found in resource
attributes, store that as a label like we do with span
attributes.

* processor/otel: handle array attrs for spans

(cherry picked from commit 4264a2a)
  • Loading branch information
axw authored and mergify-bot committed Jul 15, 2021
1 parent 34bb408 commit 8831d73
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 18 deletions.
19 changes: 3 additions & 16 deletions processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,22 +261,7 @@ func translateTransaction(
k := replaceDots(kDots)
switch v.Type() {
case pdata.AttributeValueTypeArray:
array := v.ArrayVal()
values := make([]interface{}, array.Len())
for i := range values {
value := array.At(i)
switch value.Type() {
case pdata.AttributeValueTypeBool:
values[i] = value.BoolVal()
case pdata.AttributeValueTypeDouble:
values[i] = value.DoubleVal()
case pdata.AttributeValueTypeInt:
values[i] = value.IntVal()
case pdata.AttributeValueTypeString:
values[i] = truncate(value.StringVal())
}
}
labels[k] = values
labels[k] = ifaceAnyValueArray(v.ArrayVal())
case pdata.AttributeValueTypeBool:
labels[k] = v.BoolVal()
case pdata.AttributeValueTypeDouble:
Expand Down Expand Up @@ -489,6 +474,8 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span)

k := replaceDots(kDots)
switch v.Type() {
case pdata.AttributeValueTypeArray:
labels[k] = ifaceAnyValueArray(v.ArrayVal())
case pdata.AttributeValueTypeBool:
labels[k] = v.BoolVal()
case pdata.AttributeValueTypeDouble:
Expand Down
9 changes: 9 additions & 0 deletions processor/otel/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,15 @@ func TestArrayLabels(t *testing.T) {
"bool_array": []interface{}{false, true},
"string_array": []interface{}{"string1", "string2"},
}, tx.Labels)

span := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{
"string_array": stringArray,
"bool_array": boolArray,
})
assert.Equal(t, common.MapStr{
"bool_array": []interface{}{false, true},
"string_array": []interface{}{"string1", "string2"},
}, span.Labels)
}

func TestConsumeTracesExportTimestamp(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions processor/otel/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ func ifaceAttributeValue(v pdata.AttributeValue) interface{} {
return v.DoubleVal()
case pdata.AttributeValueTypeBool:
return v.BoolVal()
case pdata.AttributeValueTypeArray:
return ifaceAnyValueArray(v.ArrayVal())
}
return nil
}

func ifaceAnyValueArray(array pdata.AnyValueArray) []interface{} {
values := make([]interface{}, array.Len())
for i := range values {
values[i] = ifaceAttributeValue(array.At(i))
}
return values
}
20 changes: 20 additions & 0 deletions processor/otel/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"

"github.com/elastic/apm-server/model"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestResourceConventions(t *testing.T) {
Expand Down Expand Up @@ -224,6 +225,25 @@ func TestResourceConventions(t *testing.T) {
}
}

func TestResourceLabels(t *testing.T) {
stringArray := pdata.NewAttributeValueArray()
stringArray.ArrayVal().Append(pdata.NewAttributeValueString("abc"))
stringArray.ArrayVal().Append(pdata.NewAttributeValueString("def"))

intArray := pdata.NewAttributeValueArray()
intArray.ArrayVal().Append(pdata.NewAttributeValueInt(123))
intArray.ArrayVal().Append(pdata.NewAttributeValueInt(456))

metadata := transformResourceMetadata(t, map[string]pdata.AttributeValue{
"string_array": stringArray,
"int_array": intArray,
})
assert.Equal(t, common.MapStr{
"string_array": []interface{}{"abc", "def"},
"int_array": []interface{}{int64(123), int64(456)},
}, metadata.Labels)
}

func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.AttributeValue) model.Metadata {
traces, spans := newTracesSpans()
traces.ResourceSpans().At(0).Resource().Attributes().InitFromMap(resourceAttrs)
Expand Down
11 changes: 11 additions & 0 deletions systemtest/approvals/TestOTLPGRPCTraces.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
"ingested": "dynamic",
"outcome": "success"
},
"labels": {
"resource_attribute_array": [
"a",
"b"
],
"span_attribute_array": [
"a",
"b",
"c"
]
},
"observer": {
"ephemeral_id": "dynamic",
"hostname": "dynamic",
Expand Down
12 changes: 10 additions & 2 deletions systemtest/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -67,7 +68,12 @@ func TestOTLPGRPCTraces(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := sendOTLPTrace(ctx, newOTLPTracerProvider(newOTLPExporter(t, srv)))

err := sendOTLPTrace(ctx, newOTLPTracerProvider(newOTLPExporter(t, srv), sdktrace.WithResource(
resource.Merge(resource.Default(), sdkresource.NewWithAttributes(
attribute.Array("resource_attribute_array", []string{"a", "b"}),
)),
)))
require.NoError(t, err)

result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{
Expand Down Expand Up @@ -203,7 +209,9 @@ func sendOTLPTrace(ctx context.Context, tracerProvider *sdktrace.TracerProvider)
tracer := tracerProvider.Tracer("systemtest")
startTime := time.Unix(123, 456)
endTime := startTime.Add(time.Second)
_, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime))
_, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime), trace.WithAttributes(
attribute.Array("span_attribute_array", []string{"a", "b", "c"}),
))
span.End(trace.WithTimestamp(endTime))
if err := tracerProvider.ForceFlush(ctx); err != nil {
return err
Expand Down

0 comments on commit 8831d73

Please sign in to comment.