Skip to content

Commit

Permalink
[pkg/ottl] Add tmp path (#16994)
Browse files Browse the repository at this point in the history
We've had discussion recently around how to handle functions and converters (previously factory functions) that share the same functionality (see #16571)

This PR suggests a different approach that should reduce the need to chain converters/functions.

Before we chained converters together because OTTL had no place to store data except for the telemetry payload itself. While attributes could be used, it results in the need for cleanup and potentially unwanted transformations based on condition resolution. This PR introduces the concept of tmp to the logs contexts (and future contexts if we like the solution) that statements can use as a "staging" location for complex transformations.

Before to handle situations like the one in #14938 we would have to chain together functions. Pretending we had KeepKeys converter the statement would look like

merge_maps(attributes, KeepKeys(ParseJSON(body), ["log_type"]), "upsert").

These types of statements are tricky to write and can difficult to comprehend, especially for new users. Each Converter we add on increased the burden. Adding a single extra function, like Flatten, really makes a difference: merge_maps(attributes, Flatten(KeepKeys(ParseJSON(body), ["log_type"]), "."), "upsert")

Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
TylerHelmuth and evan-bradley authored Jan 18, 2023
1 parent 8e5fbc5 commit fc167d9
Show file tree
Hide file tree
Showing 22 changed files with 813 additions and 238 deletions.
16 changes: 16 additions & 0 deletions .chloggen/ottl-introduce-temp-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/ottl

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new `cache` path to all contexts which can be used as temporary cache during complex transformations

# One or more tracking issues related to the change
issues: [16994]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
56 changes: 29 additions & 27 deletions pkg/ottl/contexts/ottldatapoint/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,35 @@ In general, the DataPoint Context supports accessing pdata using the field names

The following fields are the exception.

| path | field accessed | type |
|------------------------------------------------|---------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------|
| resource | resource of the data point being processed | pcommon.Resource |
| resource.attributes | resource attributes of the data point being processed | pcommon.Map |
| resource.attributes\[""\] | the value of the resource attribute of the data point being processed | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
| resource.dropped_attributes_count | number of dropped attributes of the resource of the data point being processed | int64 |
| instrumentation_scope | instrumentation scope of the data point being processed | pcommon.InstrumentationScope |
| instrumentation_scope.name | name of the instrumentation scope of the data point being processed | string |
| instrumentation_scope.version | version of the instrumentation scope of the data point being processed | string |
| instrumentation_scope.dropped_attributes_count | number of dropped attributes of the instrumentation scope of the data point being processed | int64 |
| instrumentation_scope.attributes | instrumentation scope attributes of the data point being processed | pcommon.Map |
| instrumentation_scope.attributes\[""\] | the value of the instrumentation scope attribute of the data point being processed | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
| attributes | attributes of the data point being processed | pcommon.Map |
| attributes\[""\] | the value of the attribute of the data point being processed | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
| metric | the metric to which the data point being processed belongs | pmetric.Metric |
| metric.name | the name of the metric to which the data point being processed belongs | string |
| metric.description | the description of the metric to which the data point being processed belongs | string |
| metric.unit | the unit of the metric to which the data point being processed belongs | string |
| metric.type | the type of the metric to which the data point being processed belongs. See enums below for integer mapping. | int64 |
| metric.aggregation_temporality | the aggregation temporality of the metric to which the data point being processed belongs | int64 |
| metric.is_monotonic | the monotonicity of the metric to which the data point being processed belongs | bool |
| positive | the positive buckets of the data point being processed | pmetric.ExponentialHistogramDataPoint |
| positive.offset | the offset of the positive buckets of the data point being processed | int64 |
| positive.bucket_counts | the bucket_counts of the positive buckets of the data point being processed | uint64 |
| negative | the negative buckets of the data point being processed | pmetric.ExponentialHistogramDataPoint |
| negative.offset | the offset of the negative buckets of the data point being processed | int64 |
| negative.bucket_counts | the bucket_counts of the negative buckets of the data point being processed | uint64 |
| path | field accessed | type |
|------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------|
| cache | the value of the current transform context's temporary cache. cache can be used as a temporary placeholder for data during complex transformations | pcommon.Map |
| cache\[""\] | the value of an item in cache | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
| resource | resource of the data point being processed | pcommon.Resource |
| resource.attributes | resource attributes of the data point being processed | pcommon.Map |
| resource.attributes\[""\] | the value of the resource attribute of the data point being processed | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
| resource.dropped_attributes_count | number of dropped attributes of the resource of the data point being processed | int64 |
| instrumentation_scope | instrumentation scope of the data point being processed | pcommon.InstrumentationScope |
| instrumentation_scope.name | name of the instrumentation scope of the data point being processed | string |
| instrumentation_scope.version | version of the instrumentation scope of the data point being processed | string |
| instrumentation_scope.dropped_attributes_count | number of dropped attributes of the instrumentation scope of the data point being processed | int64 |
| instrumentation_scope.attributes | instrumentation scope attributes of the data point being processed | pcommon.Map |
| instrumentation_scope.attributes\[""\] | the value of the instrumentation scope attribute of the data point being processed | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
| attributes | attributes of the data point being processed | pcommon.Map |
| attributes\[""\] | the value of the attribute of the data point being processed | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
| metric | the metric to which the data point being processed belongs | pmetric.Metric |
| metric.name | the name of the metric to which the data point being processed belongs | string |
| metric.description | the description of the metric to which the data point being processed belongs | string |
| metric.unit | the unit of the metric to which the data point being processed belongs | string |
| metric.type | the type of the metric to which the data point being processed belongs. See enums below for integer mapping. | int64 |
| metric.aggregation_temporality | the aggregation temporality of the metric to which the data point being processed belongs | int64 |
| metric.is_monotonic | the monotonicity of the metric to which the data point being processed belongs | bool |
| positive | the positive buckets of the data point being processed | pmetric.ExponentialHistogramDataPoint |
| positive.offset | the offset of the positive buckets of the data point being processed | int64 |
| positive.bucket_counts | the bucket_counts of the positive buckets of the data point being processed | uint64 |
| negative | the negative buckets of the data point being processed | pmetric.ExponentialHistogramDataPoint |
| negative.offset | the offset of the negative buckets of the data point being processed | int64 |
| negative.bucket_counts | the bucket_counts of the negative buckets of the data point being processed | uint64 |

## Enums

Expand Down
38 changes: 38 additions & 0 deletions pkg/ottl/contexts/ottldatapoint/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TransformContext struct {
metrics pmetric.MetricSlice
instrumentationScope pcommon.InstrumentationScope
resource pcommon.Resource
cache pcommon.Map
}

func NewTransformContext(dataPoint interface{}, metric pmetric.Metric, metrics pmetric.MetricSlice, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource) TransformContext {
Expand All @@ -45,6 +46,7 @@ func NewTransformContext(dataPoint interface{}, metric pmetric.Metric, metrics p
metrics: metrics,
instrumentationScope: instrumentationScope,
resource: resource,
cache: pcommon.NewMap(),
}
}

Expand All @@ -68,6 +70,10 @@ func (tCtx TransformContext) GetMetrics() pmetric.MetricSlice {
return tCtx.metrics
}

func (tCtx TransformContext) getCache() pcommon.Map {
return tCtx.cache
}

func NewParser(functions map[string]interface{}, telemetrySettings component.TelemetrySettings) ottl.Parser[TransformContext] {
return ottl.NewParser[TransformContext](functions, parsePath, parseEnum, telemetrySettings)
}
Expand Down Expand Up @@ -102,6 +108,12 @@ func parsePath(val *ottl.Path) (ottl.GetSetter[TransformContext], error) {

func newPathGetSetter(path []ottl.Field) (ottl.GetSetter[TransformContext], error) {
switch path[0].Name {
case "cache":
mapKey := path[0].MapKey
if mapKey == nil {
return accessCache(), nil
}
return accessCacheKey(mapKey), nil
case "resource":
return ottlcommon.ResourcePathGetSetter[TransformContext](path[1:])
case "instrumentation_scope":
Expand Down Expand Up @@ -164,6 +176,32 @@ func newPathGetSetter(path []ottl.Field) (ottl.GetSetter[TransformContext], erro
return nil, fmt.Errorf("invalid path expression %v", path)
}

func accessCache() ottl.StandardGetSetter[TransformContext] {
return ottl.StandardGetSetter[TransformContext]{
Getter: func(ctx context.Context, tCtx TransformContext) (interface{}, error) {
return tCtx.getCache(), nil
},
Setter: func(ctx context.Context, tCtx TransformContext, val interface{}) error {
if m, ok := val.(pcommon.Map); ok {
m.CopyTo(tCtx.getCache())
}
return nil
},
}
}

func accessCacheKey(mapKey *string) ottl.StandardGetSetter[TransformContext] {
return ottl.StandardGetSetter[TransformContext]{
Getter: func(ctx context.Context, tCtx TransformContext) (interface{}, error) {
return ottlcommon.GetMapValue(tCtx.getCache(), *mapKey), nil
},
Setter: func(ctx context.Context, tCtx TransformContext, val interface{}) error {
ottlcommon.SetMapValue(tCtx.getCache(), *mapKey, val)
return nil
},
}
}

func accessAttributes() ottl.StandardGetSetter[TransformContext] {
return ottl.StandardGetSetter[TransformContext]{
Getter: func(ctx context.Context, tCtx TransformContext) (interface{}, error) {
Expand Down
66 changes: 66 additions & 0 deletions pkg/ottl/contexts/ottldatapoint/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,71 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest"
)

func Test_newPathGetSetter_Cache(t *testing.T) {
newCache := pcommon.NewMap()
newCache.PutStr("temp", "value")

tests := []struct {
name string
path []ottl.Field
orig interface{}
newVal interface{}
modified func(cache pcommon.Map)
valueType pmetric.NumberDataPointValueType
}{

{
name: "cache",
path: []ottl.Field{
{
Name: "cache",
},
},
orig: pcommon.NewMap(),
newVal: newCache,
modified: func(cache pcommon.Map) {
newCache.CopyTo(cache)
},
},
{
name: "cache access",
path: []ottl.Field{
{
Name: "cache",
MapKey: ottltest.Strp("temp"),
},
},
orig: nil,
newVal: "new value",
modified: func(cache pcommon.Map) {
cache.PutStr("temp", "new value")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
accessor, err := newPathGetSetter(tt.path)
assert.NoError(t, err)

numberDataPoint := createNumberDataPointTelemetry(tt.valueType)

ctx := NewTransformContext(numberDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource())

got, err := accessor.Get(context.Background(), ctx)
assert.Nil(t, err)
assert.Equal(t, tt.orig, got)

err = accessor.Set(context.Background(), ctx, tt.newVal)
assert.Nil(t, err)

exCache := pcommon.NewMap()
tt.modified(exCache)

assert.Equal(t, exCache, ctx.getCache())
})
}
}

func Test_newPathGetSetter_NumberDataPoint(t *testing.T) {
refNumberDataPoint := createNumberDataPointTelemetry(pmetric.NumberDataPointValueTypeInt)

Expand Down Expand Up @@ -1571,6 +1636,7 @@ func createSummaryDataPointTelemetry() pmetric.SummaryDataPoint {

return summaryDataPoint
}

func createAttributeTelemetry(attributes pcommon.Map) {
attributes.PutStr("str", "val")
attributes.PutBool("bool", true)
Expand Down
Loading

0 comments on commit fc167d9

Please sign in to comment.