Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: fix eval state return on error (#33996) (#…
Browse files Browse the repository at this point in the history
…33997)

Previously, when an error in evaluation or result deserialisation
occurred, the entire input activation container was returned instead of
a valid state. The result of this was that the state would nest on each
iteration of the eval event loop. In cases where errors can happen
frequently, this can result in unbounded memory use.

Also improve the robustness of the want_more flag; the old code would
continue with the loop if state["want_more"] was unset since any(nil)
does not equal false.

Relates: #33992
(cherry picked from commit f08eed8)

Co-authored-by: Dan Kortschak <[email protected]>
  • Loading branch information
mergify[bot] and efd6 authored Dec 8, 2022
1 parent b020fd5 commit eb9e939
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830]
- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968]
- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974]
- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996]

*Heartbeat*
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
Expand Down
28 changes: 12 additions & 16 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
log.Debugw("request state", logp.Namespace("cel"), "state", state)
metrics.executions.Add(1)
start := time.Now()
state, err = evalWith(ctx, prg, map[string]interface{}{
root: state,
})
state, err = evalWith(ctx, prg, state)
log.Debugw("response state", logp.Namespace("cel"), "state", state)
if err != nil {
switch {
Expand Down Expand Up @@ -425,9 +423,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
// Replace the last known good cursor.
state["cursor"] = goodCursor

// Avoid explicit type assertion. This is safe as long as the value is
// Go-comparable.
if state["want_more"] == false {
if more, _ := state["want_more"].(bool); !more {
return nil
}
}
Expand Down Expand Up @@ -805,31 +801,31 @@ func newProgram(src, root string, client *http.Client, limiter *rate.Limiter, pa
return prg, nil
}

func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, input)
func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state})
if e := ctx.Err(); e != nil {
err = e
}
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)}
return input, fmt.Errorf("failed eval: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)}
return state, fmt.Errorf("failed eval: %w", err)
}

v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{}))
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)}
return input, fmt.Errorf("failed proto conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)}
return state, fmt.Errorf("failed proto conversion: %w", err)
}
b, err := protojson.MarshalOptions{Indent: ""}.Marshal(v.(proto.Message))
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)}
return input, fmt.Errorf("failed native conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)}
return state, fmt.Errorf("failed native conversion: %w", err)
}
var res map[string]interface{}
err = json.Unmarshal(b, &res)
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)}
return input, fmt.Errorf("failed json conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)}
return state, fmt.Errorf("failed json conversion: %w", err)
}
return res, nil
}
Expand Down

0 comments on commit eb9e939

Please sign in to comment.