diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 07c43883a190..2e60ab14cd6b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654] - Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968] - Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974] +- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996] *Heartbeat* - Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723] diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 6e42a8a2f1b2..c1a1f1a50468 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -198,9 +198,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub log.Debugw("request state", logp.Namespace("cel"), "state", state) metrics.executions.Add(1) start := time.Now() - state, err = evalWith(ctx, prg, map[string]interface{}{ - root: state, - }) + state, err = evalWith(ctx, prg, state) log.Debugw("response state", logp.Namespace("cel"), "state", state) if err != nil { switch { @@ -427,9 +425,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub // Replace the last known good cursor. state["cursor"] = goodCursor - // Avoid explicit type assertion. This is safe as long as the value is - // Go-comparable. - if state["want_more"] == false { + if more, _ := state["want_more"].(bool); !more { return nil } } @@ -847,31 +843,31 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi return prg, nil } -func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}) (map[string]interface{}, error) { - out, _, err := prg.ContextEval(ctx, input) +func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) { + out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state}) if e := ctx.Err(); e != nil { err = e } if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)} - return input, fmt.Errorf("failed eval: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)} + return state, fmt.Errorf("failed eval: %w", err) } v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{})) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)} - return input, fmt.Errorf("failed proto conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)} + return state, fmt.Errorf("failed proto conversion: %w", err) } b, err := protojson.MarshalOptions{Indent: ""}.Marshal(v.(proto.Message)) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)} - return input, fmt.Errorf("failed native conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)} + return state, fmt.Errorf("failed native conversion: %w", err) } var res map[string]interface{} err = json.Unmarshal(b, &res) if err != nil { - input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)} - return input, fmt.Errorf("failed json conversion: %w", err) + state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)} + return state, fmt.Errorf("failed json conversion: %w", err) } return res, nil }