From cd81650db737dd92c5c0c5f65f86fd1a5e736510 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Fri, 9 Dec 2022 01:54:03 +1030 Subject: [PATCH] x-pack/filebeat/input/cel: fix eval state return on error (#33996) Previously, when an error in evaluation or result deserialisation occurred, the entire input activation container was returned instead of a valid state. The result of this was that the state would nest on each iteration of the eval event loop. In cases where errors can happen frequently, this can result in unbounded memory use. Also improve the robustness of the want_more flag; the old code would continue with the loop if state["want_more"] was unset since any(nil) does not equal false. Relates: #33992 (cherry picked from commit f08eed88d58d33810e6e923c03fad9a3c87bd575) --- CHANGELOG.next.asciidoc | 2 ++ x-pack/filebeat/input/cel/input.go | 28 ++++++++++++---------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bc483a83a726..876dbf24bfb3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -65,6 +65,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830] - Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654] - Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968] +- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974] +- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996] *Heartbeat* - Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723] diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 51060a2aeb46..40b8a85897c6 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -196,9 +196,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub log.Debugw("request state", logp.Namespace("cel"), "state", state) metrics.executions.Add(1) start := time.Now() - state, err = evalWith(ctx, prg, map[string]interface{}{ - root: state, - }) + state, err = evalWith(ctx, prg, state) log.Debugw("response state", logp.Namespace("cel"), "state", state) if err != nil { switch { @@ -425,9 +423,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub // Replace the last known good cursor. state["cursor"] = goodCursor - // Avoid explicit type assertion. This is safe as long as the value is - // Go-comparable. - if state["want_more"] == false { + if more, _ := state["want_more"].(bool); !more { return nil } } @@ -805,31 +801,31 @@ func newProgram(src, root string, client *http.Client, limiter *rate.Limiter, pa return prg, nil } -func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}) (map[string]interface{}, error) { - out, _, err := prg.ContextEval(ctx, input) +func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) { + out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state}) if e := ctx.Err(); e != nil { err = e } if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)} - return input, fmt.Errorf("failed eval: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)} + return state, fmt.Errorf("failed eval: %w", err) } v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{})) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)} - return input, fmt.Errorf("failed proto conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)} + return state, fmt.Errorf("failed proto conversion: %w", err) } b, err := protojson.MarshalOptions{Indent: ""}.Marshal(v.(proto.Message)) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)} - return input, fmt.Errorf("failed native conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)} + return state, fmt.Errorf("failed native conversion: %w", err) } var res map[string]interface{} err = json.Unmarshal(b, &res) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)} - return input, fmt.Errorf("failed json conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)} + return state, fmt.Errorf("failed json conversion: %w", err) } return res, nil }