Skip to content

Commit

Permalink
Allow to catch errors that occur in the apply function (#8401)
Browse files Browse the repository at this point in the history
  • Loading branch information
essobedo authored Nov 13, 2020
1 parent 8f0070b commit ca04106
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 0 deletions.
21 changes: 21 additions & 0 deletions plugins/processors/starlark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ def apply(metric):
Telegraf freezes the global scope, which prevents it from being modified.
Attempting to modify the global scope will fail with an error.

**How to manage errors that occur in the apply function?**

In case you need to call some code that may return an error, you can delegate the call
to the built-in function `catch` which takes as argument a `Callable` and returns the error
that occured if any, `None` otherwise.

So for example:

```python
load("json.star", "json")

def apply(metric):
error = catch(lambda: failing(metric))
if error != None:
# Some code to execute in case of an error
metric.fields["error"] = error
return metric

def failing(metric):
json.decode("non-json-content")
```

### Examples

Expand Down
13 changes: 13 additions & 0 deletions plugins/processors/starlark/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ func deepcopy(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple,
return &Metric{metric: dup}, nil
}

// catch(f) evaluates f() and returns its evaluation error message
// if it failed or None if it succeeded.
func catch(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var fn starlark.Callable
if err := starlark.UnpackArgs("catch", args, kwargs, "fn", &fn); err != nil {
return nil, err
}
if _, err := starlark.Call(thread, fn, nil, nil); err != nil {
return starlark.String(err.Error()), nil
}
return starlark.None, nil
}

type builtinMethod func(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error)

func builtinAttr(recv starlark.Value, name string, methods map[string]builtinMethod) (starlark.Value, error) {
Expand Down
1 change: 1 addition & 0 deletions plugins/processors/starlark/starlark.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (s *Starlark) Init() error {
builtins := starlark.StringDict{}
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
builtins["catch"] = starlark.NewBuiltin("catch", catch)

program, err := s.sourceProgram(builtins)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions plugins/processors/starlark/starlark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2378,6 +2378,46 @@ def apply(metric):
),
},
},
{
name: "support errors",
source: `
load("json.star", "json")
def apply(metric):
msg = catch(lambda: process(metric))
if msg != None:
metric.fields["error"] = msg
metric.fields["value"] = "default"
return metric
def process(metric):
metric.fields["field1"] = "value1"
metric.tags["tags1"] = "value2"
# Throw an error
json.decode(metric.fields.get('value'))
# Should never be called
metric.fields["msg"] = "value4"
`,
input: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{"value": "non-json-content", "msg": "value3"},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{"tags1": "value2"},
map[string]interface{}{
"value": "default",
"field1": "value1",
"msg": "value3",
"error": "json.decode: at offset 0, unexpected character 'n'",
},
time.Unix(0, 0),
),
},
},
}

for _, tt := range tests {
Expand Down

0 comments on commit ca04106

Please sign in to comment.