Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Workaround TSDB array dimension limitation for metrics OTel mode #35009

46 changes: 33 additions & 13 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem
document.AddInt("severity_number", int64(record.SeverityNumber()))
document.AddInt("dropped_attributes_count", int64(record.DroppedAttributesCount()))

m.encodeAttributesOTelMode(&document, record.Attributes())
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
m.encodeAttributesOTelMode(&document, record.Attributes(), false)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL, false)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL, false)

// Body
setOTelLogBody(&document, record.Body())
Expand Down Expand Up @@ -284,9 +284,9 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob
}
document.AddString("unit", metric.Unit())

m.encodeAttributesOTelMode(&document, dp.Attributes())
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
m.encodeAttributesOTelMode(&document, dp.Attributes(), true)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL, true)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL, true)
}

switch value.Type() {
Expand Down Expand Up @@ -417,7 +417,7 @@ func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) {
return pcommon.Value{}, errInvalidNumberDataPoint
}

func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resource pcommon.Resource, resourceSchemaURL string) {
func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, stringifyArrayValues bool) {
resourceMapVal := pcommon.NewValueMap()
resourceMap := resourceMapVal.Map()
if resourceSchemaURL != "" {
Expand All @@ -433,11 +433,13 @@ func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resour
}
return false
})

if stringifyArrayValues {
mapStringifyArrayValues(resourceAttrMap)
}
document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal))
}

func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pcommon.InstrumentationScope, scopeSchemaURL string) {
func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pcommon.InstrumentationScope, scopeSchemaURL string, stringifyArrayValues bool) {
scopeMapVal := pcommon.NewValueMap()
scopeMap := scopeMapVal.Map()
if scope.Name() != "" {
Expand All @@ -459,10 +461,16 @@ func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pco
}
return false
})
if stringifyArrayValues {
mapStringifyArrayValues(scopeAttrMap)
}
document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal))
}

func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) {
func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, from pcommon.Map, stringifyArrayValues bool) {
attributeMapVal := pcommon.NewValueMap()
attributeMap := attributeMapVal.Map()
from.CopyTo(attributeMap)
attributeMap.RemoveIf(func(key string, val pcommon.Value) bool {
switch key {
case dataStreamType, dataStreamDataset, dataStreamNamespace:
Expand All @@ -474,9 +482,21 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr
}
return false
})
if stringifyArrayValues {
mapStringifyArrayValues(attributeMap)
}
document.AddAttributes("attributes", attributeMap)
}

func mapStringifyArrayValues(m pcommon.Map) {
m.Range(func(k string, v pcommon.Value) bool {
if v.Type() == pcommon.ValueTypeSlice {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This not only affects array values but also other complex attribute values, like ValueTypeMap. Not quite sure how to deal with ValueTypeBytes tbh. How do we serialize it normally? Is that compatible with a field type that supports dimensions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to https://opentelemetry.io/docs/specs/otel/common/

The attribute value is either:

A primitive type: string, boolean, double precision floating point (IEEE 754-1985) or signed 64 bit integer.
An array of primitive type values. The array MUST be homogeneous, i.e., it MUST NOT contain values of different types.

I don't think ValueTypeMap and ValueTypeBytes fall into these valid value types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we dealing with cases where we get invalid attribute values? Are we ignoring them?

Copy link
Contributor Author

@carsonip carsonip Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a similar issue in apm-server where otlp events with non-compliant attributes cause panics: elastic/apm-server#13703

In es exporter, by looking at the code, ValueTypeMap will be successfully encoded as objects, ValueTypeBytes will be ignored, non-homogeneous arrays will be successfully encoded. Ideally they should be dropped explicitly with a warning, but as these are non-compliant, user should expect undefined behavior, which includes successful or unsuccessful indexing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we don't serialize non-compliant values and there's no panic, this sounds fine. Ideally, we would increment the dropped attributes count.

v.SetStr(v.AsString())
}
return true
})
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
var document objmodel.Document
switch m.mode {
Expand All @@ -502,7 +522,7 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche
document.AddString("kind", span.Kind().String())
document.AddInt("duration", int64(span.EndTimestamp()-span.StartTimestamp()))

m.encodeAttributesOTelMode(&document, span.Attributes())
m.encodeAttributesOTelMode(&document, span.Attributes(), false)

document.AddInt("dropped_attributes_count", int64(span.DroppedAttributesCount()))
document.AddInt("dropped_events_count", int64(span.DroppedEventsCount()))
Expand All @@ -526,8 +546,8 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche
document.AddString("status.message", span.Status().Message())
document.AddString("status.code", span.Status().Code().String())

m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL, false)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL, false)

// TODO: add span events to log data streams

Expand Down
Loading