diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cdf178131fc5..e40999a25041 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -326,6 +326,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add Okta input package for entity analytics. {pull}35611[35611] - Expose harvester metrics from filestream input {pull}35835[35835] {issue}33771[33771] - Add device support for Azure AD entity analytics. {pull}35807[35807] +- Improve CEL input performance. {pull}35915[35915] *Auditbeat* - Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817] diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 24c9684def1d..d67b3b38a388 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -10,7 +10,6 @@ package cel import ( "compress/gzip" "context" - "encoding/json" "errors" "fmt" "io" @@ -33,8 +32,6 @@ import ( "github.com/google/cel-go/cel" "github.com/google/cel-go/checker/decls" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -909,23 +906,20 @@ func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{} return state, fmt.Errorf("failed eval: %w", err) } - v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{})) + v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Struct)(nil))) if err != nil { state["events"] = errorMessage(fmt.Sprintf("failed proto conversion: %v", err)) return state, fmt.Errorf("failed proto conversion: %w", err) } - b, err := protojson.MarshalOptions{Indent: ""}.Marshal(v.(proto.Message)) - if err != nil { - state["events"] = errorMessage(fmt.Sprintf("failed native conversion: %v", err)) - return state, fmt.Errorf("failed native conversion: %w", err) - } - var res map[string]interface{} - err = json.Unmarshal(b, &res) - if err != nil { - state["events"] = errorMessage(fmt.Sprintf("failed json conversion: %v", err)) - return state, fmt.Errorf("failed json conversion: %w", err) + switch v := v.(type) { + case *structpb.Struct: + return v.AsMap(), nil + default: + // This should never happen. + errMsg := fmt.Sprintf("unexpected native conversion type: %T", v) + state["events"] = errorMessage(errMsg) + return state, errors.New(errMsg) } - return res, nil } func errorMessage(msg string) map[string]interface{} {