From 5cf687ee4678dade53e98d326b47c2a690f625e1 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 8 Dec 2022 17:08:22 +1030 Subject: [PATCH] x-pack/filebeat/input/cel: fix eval state return on error Previously, when an error in evaluation or result deserialisation occurred, the entire input activation container was returned instead of a valid state. The result of this was that the state would nest on each iteration of the eval event loop. In cases where errors can happen frequently, this can result in unbounded memory use. Also improve the robustness of the want_more flag; the old code would continue with the loop if state["want_more"] was unset since any(nil) does not equal false. --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/cel/input.go | 28 ++++++++++++---------------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 07c43883a19..2e60ab14cd6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654] - Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968] - Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974] +- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996] *Heartbeat* - Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723] diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 6e42a8a2f1b..c1a1f1a5046 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -198,9 +198,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub log.Debugw("request state", logp.Namespace("cel"), "state", state) metrics.executions.Add(1) start := time.Now() - state, err = evalWith(ctx, prg, map[string]interface{}{ - root: state, - }) + state, err = evalWith(ctx, prg, state) log.Debugw("response state", logp.Namespace("cel"), "state", state) if err != nil { switch { @@ -427,9 +425,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub // Replace the last known good cursor. state["cursor"] = goodCursor - // Avoid explicit type assertion. This is safe as long as the value is - // Go-comparable. - if state["want_more"] == false { + if more, _ := state["want_more"].(bool); !more { return nil } } @@ -847,31 +843,31 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi return prg, nil } -func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}) (map[string]interface{}, error) { - out, _, err := prg.ContextEval(ctx, input) +func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) { + out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state}) if e := ctx.Err(); e != nil { err = e } if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)} - return input, fmt.Errorf("failed eval: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)} + return state, fmt.Errorf("failed eval: %w", err) } v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{})) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)} - return input, fmt.Errorf("failed proto conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)} + return state, fmt.Errorf("failed proto conversion: %w", err) } b, err := protojson.MarshalOptions{Indent: ""}.Marshal(v.(proto.Message)) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)} - return input, fmt.Errorf("failed native conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)} + return state, fmt.Errorf("failed native conversion: %w", err) } var res map[string]interface{} err = json.Unmarshal(b, &res) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)} - return input, fmt.Errorf("failed json conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)} + return state, fmt.Errorf("failed json conversion: %w", err) } return res, nil }