From b6c377c88447c3e922d678147c3c504d95200a3c Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Fri, 21 Jul 2023 09:44:54 +0930 Subject: [PATCH] x-pack/filebeat/input/cel: make now global static per evaluation (#36107) Previously now was static for the life of the program, which corresponded to the life of the input. This could lead to incorrect and surprising times being provided when the global was used. Obtain a now value before starting each evaluation and use it to shadow the CEL now global (and share it with the Go logging to allow correlation between these). --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/cel/input.go | 34 ++++++++++++++++++++----- x-pack/filebeat/input/cel/input_test.go | 20 ++++++++++++++- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 70bdf8bd8fd..9dd9c35a1a2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -151,6 +151,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027] - Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008] - Fix handling of region name configuration in awss3 input {pull}36034[36034] +- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107] *Heartbeat* diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index d67b3b38a38..afcf2549f65 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -73,7 +73,17 @@ func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { } } -type input struct{} +type input struct { + time func() time.Time +} + +// now is time.Now with a modifiable time source. +func (i input) now() time.Time { + if i.time == nil { + return time.Now() + } + return i.time() +} func (input) Name() string { return inputName } @@ -107,7 +117,7 @@ func sanitizeFileName(name string) string { return strings.ReplaceAll(name, string(filepath.Separator), "_") } -func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error { +func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error { cfg := src.cfg log := env.Logger.With("input_url", cfg.Resource.URL) @@ -218,8 +228,8 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub // Process a set of event requests. log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete}) metrics.executions.Add(1) - start := time.Now() - state, err = evalWith(ctx, prg, state) + start := i.now() + state, err = evalWith(ctx, prg, state, start) log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete}) if err != nil { switch { @@ -896,8 +906,20 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi return prg, nil } -func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) { - out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state}) +func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}, now time.Time) (map[string]interface{}, error) { + out, _, err := prg.ContextEval(ctx, map[string]interface{}{ + // Replace global program "now" with current time. This is necessary + // as the lib.Time now global is static at program instantiation time + // which will persist over multiple evaluations. The lib.Time behaviour + // is correct for mito where CEL program instances live for only a + // single evaluation. Rather than incurring the cost of creating a new + // cel.Program for each evaluation, shadow lib.Time's now with a new + // value for each eval. We retain the lib.Time now global for + // compatibility between CEL programs developed in mito with programs + // run in the input. + "now": now, + root: state, + }) if e := ctx.Err(); e != nil { err = e } diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go index 0ab48bbfe21..4db00b26ad4 100644 --- a/x-pack/filebeat/input/cel/input_test.go +++ b/x-pack/filebeat/input/cel/input_test.go @@ -36,6 +36,7 @@ var inputTests = []struct { server func(*testing.T, http.HandlerFunc, map[string]interface{}) handler http.HandlerFunc config map[string]interface{} + time func() time.Time persistCursor map[string]interface{} want []map[string]interface{} wantCursor []map[string]interface{} @@ -57,6 +58,23 @@ var inputTests = []struct { {"message": "Hello, World!"}, }, }, + { + name: "hello_world_time", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"Hello, World!": now}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) }, + want: []map[string]interface{}{{ + "message": map[string]interface{}{ + "Hello, World!": "2010-02-08T00:00:00Z", + }, + }}, + }, { name: "bad_events_type", config: map[string]interface{}{ @@ -1238,7 +1256,7 @@ func TestInput(t *testing.T) { cancel() } } - err = input{}.run(v2Ctx, src, test.persistCursor, &client) + err = input{test.time}.run(v2Ctx, src, test.persistCursor, &client) if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr) }