From 8831d73fc490d644a9907cf2e82f5a9491a2a276 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 15 Jul 2021 10:19:35 +0800 Subject: [PATCH] processor/otel: record array resource attributes (#5704) * processor/otel: record array resource attributes If an unhandled array attribute is found in resource attributes, store that as a label like we do with span attributes. * processor/otel: handle array attrs for spans (cherry picked from commit 4264a2a3c2e600044d1379e351346ace2c605b79) --- processor/otel/consumer.go | 19 +++--------------- processor/otel/consumer_test.go | 9 +++++++++ processor/otel/metadata.go | 10 ++++++++++ processor/otel/metadata_test.go | 20 +++++++++++++++++++ .../TestOTLPGRPCTraces.approved.json | 11 ++++++++++ systemtest/otlp_test.go | 12 +++++++++-- 6 files changed, 63 insertions(+), 18 deletions(-) diff --git a/processor/otel/consumer.go b/processor/otel/consumer.go index 2232d29aa6d..dc99bb67693 100644 --- a/processor/otel/consumer.go +++ b/processor/otel/consumer.go @@ -261,22 +261,7 @@ func translateTransaction( k := replaceDots(kDots) switch v.Type() { case pdata.AttributeValueTypeArray: - array := v.ArrayVal() - values := make([]interface{}, array.Len()) - for i := range values { - value := array.At(i) - switch value.Type() { - case pdata.AttributeValueTypeBool: - values[i] = value.BoolVal() - case pdata.AttributeValueTypeDouble: - values[i] = value.DoubleVal() - case pdata.AttributeValueTypeInt: - values[i] = value.IntVal() - case pdata.AttributeValueTypeString: - values[i] = truncate(value.StringVal()) - } - } - labels[k] = values + labels[k] = ifaceAnyValueArray(v.ArrayVal()) case pdata.AttributeValueTypeBool: labels[k] = v.BoolVal() case pdata.AttributeValueTypeDouble: @@ -489,6 +474,8 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span) k := replaceDots(kDots) switch v.Type() { + case pdata.AttributeValueTypeArray: + labels[k] = ifaceAnyValueArray(v.ArrayVal()) case pdata.AttributeValueTypeBool: labels[k] = v.BoolVal() case pdata.AttributeValueTypeDouble: diff --git a/processor/otel/consumer_test.go b/processor/otel/consumer_test.go index ff7b8dd3025..533941660db 100644 --- a/processor/otel/consumer_test.go +++ b/processor/otel/consumer_test.go @@ -577,6 +577,15 @@ func TestArrayLabels(t *testing.T) { "bool_array": []interface{}{false, true}, "string_array": []interface{}{"string1", "string2"}, }, tx.Labels) + + span := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{ + "string_array": stringArray, + "bool_array": boolArray, + }) + assert.Equal(t, common.MapStr{ + "bool_array": []interface{}{false, true}, + "string_array": []interface{}{"string1", "string2"}, + }, span.Labels) } func TestConsumeTracesExportTimestamp(t *testing.T) { diff --git a/processor/otel/metadata.go b/processor/otel/metadata.go index 305f10a71a5..427ef5f9b33 100644 --- a/processor/otel/metadata.go +++ b/processor/otel/metadata.go @@ -227,6 +227,16 @@ func ifaceAttributeValue(v pdata.AttributeValue) interface{} { return v.DoubleVal() case pdata.AttributeValueTypeBool: return v.BoolVal() + case pdata.AttributeValueTypeArray: + return ifaceAnyValueArray(v.ArrayVal()) } return nil } + +func ifaceAnyValueArray(array pdata.AnyValueArray) []interface{} { + values := make([]interface{}, array.Len()) + for i := range values { + values[i] = ifaceAttributeValue(array.At(i)) + } + return values +} diff --git a/processor/otel/metadata_test.go b/processor/otel/metadata_test.go index 647123e9eff..cc3d04848f5 100644 --- a/processor/otel/metadata_test.go +++ b/processor/otel/metadata_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" "github.com/elastic/apm-server/model" + "github.com/elastic/beats/v7/libbeat/common" ) func TestResourceConventions(t *testing.T) { @@ -224,6 +225,25 @@ func TestResourceConventions(t *testing.T) { } } +func TestResourceLabels(t *testing.T) { + stringArray := pdata.NewAttributeValueArray() + stringArray.ArrayVal().Append(pdata.NewAttributeValueString("abc")) + stringArray.ArrayVal().Append(pdata.NewAttributeValueString("def")) + + intArray := pdata.NewAttributeValueArray() + intArray.ArrayVal().Append(pdata.NewAttributeValueInt(123)) + intArray.ArrayVal().Append(pdata.NewAttributeValueInt(456)) + + metadata := transformResourceMetadata(t, map[string]pdata.AttributeValue{ + "string_array": stringArray, + "int_array": intArray, + }) + assert.Equal(t, common.MapStr{ + "string_array": []interface{}{"abc", "def"}, + "int_array": []interface{}{int64(123), int64(456)}, + }, metadata.Labels) +} + func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.AttributeValue) model.Metadata { traces, spans := newTracesSpans() traces.ResourceSpans().At(0).Resource().Attributes().InitFromMap(resourceAttrs) diff --git a/systemtest/approvals/TestOTLPGRPCTraces.approved.json b/systemtest/approvals/TestOTLPGRPCTraces.approved.json index 4308a49f805..60428878a88 100644 --- a/systemtest/approvals/TestOTLPGRPCTraces.approved.json +++ b/systemtest/approvals/TestOTLPGRPCTraces.approved.json @@ -13,6 +13,17 @@ "ingested": "dynamic", "outcome": "success" }, + "labels": { + "resource_attribute_array": [ + "a", + "b" + ], + "span_attribute_array": [ + "a", + "b", + "c" + ] + }, "observer": { "ephemeral_id": "dynamic", "hostname": "dynamic", diff --git a/systemtest/otlp_test.go b/systemtest/otlp_test.go index 20256505620..beb26ff74c4 100644 --- a/systemtest/otlp_test.go +++ b/systemtest/otlp_test.go @@ -35,6 +35,7 @@ import ( controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" + "go.opentelemetry.io/otel/sdk/resource" sdkresource "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" @@ -67,7 +68,12 @@ func TestOTLPGRPCTraces(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := sendOTLPTrace(ctx, newOTLPTracerProvider(newOTLPExporter(t, srv))) + + err := sendOTLPTrace(ctx, newOTLPTracerProvider(newOTLPExporter(t, srv), sdktrace.WithResource( + resource.Merge(resource.Default(), sdkresource.NewWithAttributes( + attribute.Array("resource_attribute_array", []string{"a", "b"}), + )), + ))) require.NoError(t, err) result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ @@ -203,7 +209,9 @@ func sendOTLPTrace(ctx context.Context, tracerProvider *sdktrace.TracerProvider) tracer := tracerProvider.Tracer("systemtest") startTime := time.Unix(123, 456) endTime := startTime.Add(time.Second) - _, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime)) + _, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime), trace.WithAttributes( + attribute.Array("span_attribute_array", []string{"a", "b", "c"}), + )) span.End(trace.WithTimestamp(endTime)) if err := tracerProvider.ForceFlush(ctx); err != nil { return err