Skip to content

Commit

Permalink
fix(serializers.json): allow stateful transformations (#12735)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Feb 24, 2023
1 parent e51b381 commit a90b6eb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 12 deletions.
22 changes: 10 additions & 12 deletions plugins/serializers/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,15 @@ type Serializer struct {
TimestampUnits time.Duration
TimestampFormat string

transformation *jsonata.Expr
transformation string
nestedfields filter.Filter
}

func NewSerializer(cfg FormatConfig) (*Serializer, error) {
s := &Serializer{
TimestampUnits: truncateDuration(cfg.TimestampUnits),
TimestampFormat: cfg.TimestampFormat,
}

if cfg.Transformation != "" {
e, err := jsonata.Compile(cfg.Transformation)
if err != nil {
return nil, err
}
s.transformation = e
transformation: cfg.Transformation,
}

if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 {
Expand All @@ -58,7 +51,7 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
var obj interface{}
obj = s.createObject(metric)

if s.transformation != nil {
if s.transformation != "" {
var err error
if obj, err = s.transform(obj); err != nil {
if errors.Is(err, jsonata.ErrUndefined) {
Expand Down Expand Up @@ -89,7 +82,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
"metrics": objects,
}

if s.transformation != nil {
if s.transformation != "" {
var err error
if obj, err = s.transform(obj); err != nil {
if errors.Is(err, jsonata.ErrUndefined) {
Expand Down Expand Up @@ -150,7 +143,12 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{}
}

func (s *Serializer) transform(obj interface{}) (interface{}, error) {
return s.transformation.Eval(obj)
transformation, err := jsonata.Compile(s.transformation)
if err != nil {
return nil, err
}

return transformation.Eval(obj)
}

func truncateDuration(units time.Duration) time.Duration {
Expand Down
61 changes: 61 additions & 0 deletions plugins/serializers/json/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,67 @@ func TestSerializeTransformationBatch(t *testing.T) {
}
}

func TestSerializeTransformationIssue12734(t *testing.T) {
input := []telegraf.Metric{
metric.New(
"data",
map[string]string{"key": "a"},
map[string]interface{}{"value": 10.1},
time.Unix(0, 1676285135457000000),
),
metric.New(
"data",
map[string]string{"key": "b"},
map[string]interface{}{"value": 20.2},
time.Unix(0, 1676285135457000000),
),
metric.New(
"data",
map[string]string{"key": "c"},
map[string]interface{}{"value": 30.3},
time.Unix(0, 1676285135457000000),
),
}

transformation := `
{
"valueRows": metrics{$string(timestamp): fields.value[]} ~> $each(function($v, $k) {
{
"timestamp": $number($k),
"values": $v
}
})
}
`

expected := map[string]interface{}{
"valueRows": map[string]interface{}{
"timestamp": 1.676285135e+9,
"values": []interface{}{10.1, 20.2, 30.3},
},
}

// Setup serializer
serializer, err := NewSerializer(
FormatConfig{
Transformation: transformation,
},
)
require.NoError(t, err)

// Check multiple serializations as issue #12734 shows that the
// transformation breaks after the first iteration
for i := 1; i <= 3; i++ {
buf, err := serializer.SerializeBatch(input)
require.NoErrorf(t, err, "broke in iteration %d", i)

// Compare
var actual interface{}
require.NoError(t, json.Unmarshal(buf, &actual))
require.EqualValuesf(t, expected, actual, "broke in iteration %d", i)
}
}

func TestSerializeNesting(t *testing.T) {
var tests = []struct {
name string
Expand Down

0 comments on commit a90b6eb

Please sign in to comment.