diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a6e641d6e527..ede5f67c41d9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -110,9 +110,10 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix panic in TCP and UDP inputs on Linux when collecting socket metrics from OS. {issue}35064[35064] - Correctly collect TCP and UDP metrics for unspecified address values. {pull}35111[35111] - Fix base for UDP and TCP queue metrics and UDP drops metric. {pull}35123[35123] -- Sanitize filenames for request tracer in httpjson and cel inputs. {pull}35143[35143] +- Sanitize filenames for request tracer in httpjson input. {pull}35143[35143] - decode_cef processor: Fix ECS output by making `observer.ip` into an array of strings instead of string. {issue}35140[35140] {pull}35149[35149] - Fix handling of MySQL audit logs with strict JSON parser. {issue}35158[35158] {pull}35160[35160] +- Sanitize filenames for request tracer in cel input. {pull}35154[35154] - Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169] *Heartbeat* diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 07c20a3680e4..08e7357d9007 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -18,6 +18,7 @@ import ( "net" "net/http" "net/url" + "path/filepath" "reflect" "regexp" "strconv" @@ -101,6 +102,15 @@ func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor return input{}.run(env, src.(*source), cursor, pub) } +// sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. +// The request.tracer.filename may have ":" when a httpjson input has cursor config and +// the macOS Finder will treat this as path-separator and causes to show up strange filepaths. +func sanitizeFileName(name string) string { + name = strings.ReplaceAll(name, ":", string(filepath.Separator)) + name = filepath.Clean(name) + return strings.ReplaceAll(name, string(filepath.Separator), "_") +} + func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error { cfg := src.cfg log := env.Logger.With("input_url", cfg.Resource.URL) @@ -111,7 +121,8 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub ctx := ctxtool.FromCanceller(env.Cancelation) if cfg.Resource.Tracer != nil { - cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", env.ID) + id := sanitizeFileName(env.ID) + cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id) } client, err := newClient(ctx, cfg, log) @@ -663,6 +674,11 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, if cfg.Resource.Tracer != nil { w := zapcore.AddSync(cfg.Resource.Tracer) + go func() { + // Close the logger when we are done. + <-ctx.Done() + cfg.Resource.Tracer.Close() + }() core := ecszap.NewCore( ecszap.NewDefaultEncoderConfig(), w, diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go index ea96380fbb1c..4c2e5dea9568 100644 --- a/x-pack/filebeat/input/cel/input_test.go +++ b/x-pack/filebeat/input/cel/input_test.go @@ -31,594 +31,655 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -var inputTests = []struct { - name string - server func(*testing.T, http.HandlerFunc, map[string]interface{}) - handler http.HandlerFunc - config map[string]interface{} - persistCursor map[string]interface{} - want []map[string]interface{} - wantCursor []map[string]interface{} - wantErr error -}{ - // Autonomous tests (no FS or net dependency). - { - name: "hello_world", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events":[{"message":"Hello, World!"}]}`, - "state": nil, - "resource": map[string]interface{}{ - "url": "", +func TestInput(t *testing.T) { + tempDirectory := t.TempDir() + var inputTests = []struct { + name string + server func(*testing.T, http.HandlerFunc, map[string]interface{}) + handler http.HandlerFunc + config map[string]interface{} + persistCursor map[string]interface{} + want []map[string]interface{} + wantCursor []map[string]interface{} + wantErr error + wantFile string + }{ + // Autonomous tests (no FS or net dependency). + { + name: "hello_world", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":"Hello, World!"}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, }, - }, - want: []map[string]interface{}{ - {"message": "Hello, World!"}, - }, - }, - { - name: "bad_events_type", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events":["Hello, World!"]}`, - "state": nil, - "resource": map[string]interface{}{ - "url": "", + want: []map[string]interface{}{ + {"message": "Hello, World!"}, }, }, - wantErr: fmt.Errorf("unexpected type returned for evaluation events: %T", "Hello, World!"), - }, - { - name: "hello_world_non_nil_state", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events":[{"message":"Hello, World!"}]}`, - "state": map[string]interface{}{}, - "resource": map[string]interface{}{ - "url": "", + { + name: "bad_events_type", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":["Hello, World!"]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, }, + wantErr: fmt.Errorf("unexpected type returned for evaluation events: %T", "Hello, World!"), }, - want: []map[string]interface{}{ - {"message": "Hello, World!"}, - }, - }, - { - name: "what_is_next", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events":[{"message":"Hello, World!"}],"cursor":[{"todo":"What's next?"}]}`, - "state": nil, - "resource": map[string]interface{}{ - "url": "", + { + name: "hello_world_non_nil_state", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":"Hello, World!"}]}`, + "state": map[string]interface{}{}, + "resource": map[string]interface{}{ + "url": "", + }, }, - }, - want: []map[string]interface{}{ - {"message": "Hello, World!"}, - }, - wantCursor: []map[string]interface{}{ - {"todo": "What's next?"}, - }, - }, - { - name: "bad_cursor_type", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events":[{"message":"Hello, World!"}],"cursor":["What's next?"]}`, - "state": nil, - "resource": map[string]interface{}{ - "url": "", + want: []map[string]interface{}{ + {"message": "Hello, World!"}, }, }, - wantErr: fmt.Errorf("unexpected type returned for evaluation cursor element: %T", "What's next?"), - }, - { - name: "show_state", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events":[state]}`, - "state": nil, - "resource": map[string]interface{}{ - "url": "", + { + name: "what_is_next", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":"Hello, World!"}],"cursor":[{"todo":"What's next?"}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, }, - }, - want: []map[string]interface{}{ - {"url": ""}, - }, - }, - { - name: "show_provided_state", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events":[state]}`, - "state": map[string]interface{}{ - "we": "can", - "put": []string{"a", "bunch"}, - "of": "stuff", - "here": "!", - }, - "resource": map[string]interface{}{ - "url": "", + want: []map[string]interface{}{ + {"message": "Hello, World!"}, + }, + wantCursor: []map[string]interface{}{ + {"todo": "What's next?"}, }, }, - want: []map[string]interface{}{ - { - "we": "can", - "put": []interface{}{"a", "bunch"}, // We lose typing. - "of": "stuff", - "here": "!", - "url": "", + { + name: "bad_cursor_type", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":"Hello, World!"}],"cursor":["What's next?"]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, }, + wantErr: fmt.Errorf("unexpected type returned for evaluation cursor element: %T", "What's next?"), }, - }, - { - name: "iterative_state", - config: map[string]interface{}{ - "interval": 1, - "program": ` -{ - "events":[ - {"message": state.data[state.cursor.next]}, - ], - "cursor":[ - {"next": int(state.cursor.next)+1}, // Ensure we have a number index. - ], - "data": state.data, // Make sure we have this for the next iteration. -} -`, - "state": map[string]interface{}{ - "data": []string{"first", "second", "third"}, - "cursor": map[string]int{"next": 0}, + { + name: "show_state", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[state]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, }, - "resource": map[string]interface{}{ - "url": "", + want: []map[string]interface{}{ + {"url": ""}, }, }, - want: []map[string]interface{}{ - {"message": "first"}, - {"message": "second"}, - {"message": "third"}, - }, - wantCursor: []map[string]interface{}{ - // The serialisation of numbers is to float when under 1<<53 (strings above). - // This is not visible within CEL, but presents in Go testing. - {"next": 1.0}, - {"next": 2.0}, - {"next": 3.0}, - }, - }, - { - name: "iterative_state_implicit_initial_cursor", - config: map[string]interface{}{ - "interval": 1, - "program": ` -int(has(state.cursor) && has(state.cursor.next) ? state.cursor.next : 0).as(index, { - "events":[ - {"message": state.data[index]}, - ], - "cursor":[ - {"next": index+1}, - ], - "data": state.data, // Make sure we have this for the next iteration. -}) -`, - "state": map[string]interface{}{ - "data": []string{"first", "second", "third"}, - }, - "resource": map[string]interface{}{ - "url": "", + { + name: "show_provided_state", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[state]}`, + "state": map[string]interface{}{ + "we": "can", + "put": []string{"a", "bunch"}, + "of": "stuff", + "here": "!", + }, + "resource": map[string]interface{}{ + "url": "", + }, + }, + want: []map[string]interface{}{ + { + "we": "can", + "put": []interface{}{"a", "bunch"}, // We lose typing. + "of": "stuff", + "here": "!", + "url": "", + }, }, }, - want: []map[string]interface{}{ - {"message": "first"}, - {"message": "second"}, - {"message": "third"}, - }, - wantCursor: []map[string]interface{}{ - // The serialisation of numbers is to float when under 1<<53 (strings above). - // This is not visible within CEL, but presents in Go testing. - {"next": 1.0}, - {"next": 2.0}, - {"next": 3.0}, - }, - }, + { + name: "iterative_state", + config: map[string]interface{}{ + "interval": 1, + "program": ` { - name: "iterative_state_provided_stored_cursor", - config: map[string]interface{}{ - "interval": 1, - "program": ` -{ - "events":[ - {"message": state.data[state.cursor.next]}, - ], - "cursor":[ - {"next": int(state.cursor.next)+1}, // Ensure we have a number index. - ], - "data": state.data, // Make sure we have this for the next iteration. -} -`, - "state": map[string]interface{}{ - "data": []string{"first", "second", "third"}, - "cursor": map[string]int{"next": 0}, + "events":[ + {"message": state.data[state.cursor.next]}, + ], + "cursor":[ + {"next": int(state.cursor.next)+1}, // Ensure we have a number index. + ], + "data": state.data, // Make sure we have this for the next iteration. + } + `, + "state": map[string]interface{}{ + "data": []string{"first", "second", "third"}, + "cursor": map[string]int{"next": 0}, + }, + "resource": map[string]interface{}{ + "url": "", + }, }, - "resource": map[string]interface{}{ - "url": "", + want: []map[string]interface{}{ + {"message": "first"}, + {"message": "second"}, + {"message": "third"}, }, - }, - persistCursor: map[string]interface{}{ - "next": 1, - }, - want: []map[string]interface{}{ - {"message": "second"}, - {"message": "third"}, - }, - wantCursor: []map[string]interface{}{ - // The serialisation of numbers is to float when under 1<<53 (strings above). - // This is not visible within CEL, but presents in Go testing. - {"next": 2.0}, - {"next": 3.0}, - }, - }, - { - name: "iterative_state_implicit_initial_cursor_provided_stored_cursor", - config: map[string]interface{}{ - "interval": 1, - "program": ` -int(has(state.cursor) && has(state.cursor.next) ? state.cursor.next : 0).as(index, { - "events":[ - {"message": state.data[index]}, - ], - "cursor":[ - {"next": index+1}, - ], - "data": state.data, // Make sure we have this for the next iteration. -}) -`, - "state": map[string]interface{}{ - "data": []string{"first", "second", "third"}, - }, - "resource": map[string]interface{}{ - "url": "", + wantCursor: []map[string]interface{}{ + // The serialisation of numbers is to float when under 1<<53 (strings above). + // This is not visible within CEL, but presents in Go testing. + {"next": 1.0}, + {"next": 2.0}, + {"next": 3.0}, }, }, - persistCursor: map[string]interface{}{ - "next": 1, - }, - want: []map[string]interface{}{ - {"message": "second"}, - {"message": "third"}, - }, - wantCursor: []map[string]interface{}{ - // The serialisation of numbers is to float when under 1<<53 (strings above). - // This is not visible within CEL, but presents in Go testing. - {"next": 2.0}, - {"next": 3.0}, - }, - }, - { - name: "strings_split", - config: map[string]interface{}{ - "interval": 1, - "program": ` -{ - "events": state.data.split(":").map(s, { - "message": s - } - ) -} -`, - "state": map[string]interface{}{ - "data": "first:second:third", + name: "iterative_state_implicit_initial_cursor", + config: map[string]interface{}{ + "interval": 1, + "program": ` + int(has(state.cursor) && has(state.cursor.next) ? state.cursor.next : 0).as(index, { + "events":[ + {"message": state.data[index]}, + ], + "cursor":[ + {"next": index+1}, + ], + "data": state.data, // Make sure we have this for the next iteration. + }) + `, + "state": map[string]interface{}{ + "data": []string{"first", "second", "third"}, + }, + "resource": map[string]interface{}{ + "url": "", + }, }, - "resource": map[string]interface{}{ - "url": "", + want: []map[string]interface{}{ + {"message": "first"}, + {"message": "second"}, + {"message": "third"}, + }, + wantCursor: []map[string]interface{}{ + // The serialisation of numbers is to float when under 1<<53 (strings above). + // This is not visible within CEL, but presents in Go testing. + {"next": 1.0}, + {"next": 2.0}, + {"next": 3.0}, }, }, - want: []map[string]interface{}{ - {"message": "first"}, - {"message": "second"}, - {"message": "third"}, - }, - }, - - // FS-based tests. + { + name: "iterative_state_provided_stored_cursor", + config: map[string]interface{}{ + "interval": 1, + "program": ` { - name: "ndjson_log_file_simple", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, - "resource": map[string]interface{}{ - "url": "testdata/log-1.ndjson", + "events":[ + {"message": state.data[state.cursor.next]}, + ], + "cursor":[ + {"next": int(state.cursor.next)+1}, // Ensure we have a number index. + ], + "data": state.data, // Make sure we have this for the next iteration. + } + `, + "state": map[string]interface{}{ + "data": []string{"first", "second", "third"}, + "cursor": map[string]int{"next": 0}, + }, + "resource": map[string]interface{}{ + "url": "", + }, + }, + persistCursor: map[string]interface{}{ + "next": 1, + }, + want: []map[string]interface{}{ + {"message": "second"}, + {"message": "third"}, + }, + wantCursor: []map[string]interface{}{ + // The serialisation of numbers is to float when under 1<<53 (strings above). + // This is not visible within CEL, but presents in Go testing. + {"next": 2.0}, + {"next": 3.0}, }, }, - want: []map[string]interface{}{ - {"level": "info", "message": "something happened"}, - {"level": "error", "message": "something bad happened"}, + { + name: "iterative_state_implicit_initial_cursor_provided_stored_cursor", + config: map[string]interface{}{ + "interval": 1, + "program": ` + int(has(state.cursor) && has(state.cursor.next) ? state.cursor.next : 0).as(index, { + "events":[ + {"message": state.data[index]}, + ], + "cursor":[ + {"next": index+1}, + ], + "data": state.data, // Make sure we have this for the next iteration. + }) + `, + "state": map[string]interface{}{ + "data": []string{"first", "second", "third"}, + }, + "resource": map[string]interface{}{ + "url": "", + }, + }, + persistCursor: map[string]interface{}{ + "next": 1, + }, + want: []map[string]interface{}{ + {"message": "second"}, + {"message": "third"}, + }, + wantCursor: []map[string]interface{}{ + // The serialisation of numbers is to float when under 1<<53 (strings above). + // This is not visible within CEL, but presents in Go testing. + {"next": 2.0}, + {"next": 3.0}, + }, }, - }, + { + name: "strings_split", + config: map[string]interface{}{ + "interval": 1, + "program": ` { - name: "ndjson_log_file_simple_file_scheme", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, - "resource": map[string]interface{}{ - "url": fileSchemePath("testdata/log-1.ndjson"), + "events": state.data.split(":").map(s, + { + "message": s + } + ) + } + `, + "state": map[string]interface{}{ + "data": "first:second:third", + }, + "resource": map[string]interface{}{ + "url": "", + }, + }, + want: []map[string]interface{}{ + {"message": "first"}, + {"message": "second"}, + {"message": "third"}, }, }, - want: []map[string]interface{}{ - {"level": "info", "message": "something happened"}, - {"level": "error", "message": "something bad happened"}, - }, - }, - { - name: "ndjson_log_file_corrupted", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, - "resource": map[string]interface{}{ - "url": "testdata/corrupted-log-1.ndjson", + + // FS-based tests. + { + name: "ndjson_log_file_simple", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, + "resource": map[string]interface{}{ + "url": "testdata/log-1.ndjson", + }, + }, + want: []map[string]interface{}{ + {"level": "info", "message": "something happened"}, + {"level": "error", "message": "something bad happened"}, }, }, - want: []map[string]interface{}{ - {"level": "info", "message": "something happened"}, - {"error.message": `unexpected end of JSON input: {"message":"Dave, stop. Stop, will you? Stop, Dave. Will you stop, Dave? Stop, Dave."`}, - {"level": "error", "message": "something bad happened"}, + { + name: "ndjson_log_file_simple_file_scheme", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, + "resource": map[string]interface{}{ + "url": fileSchemePath("testdata/log-1.ndjson"), + }, + }, + want: []map[string]interface{}{ + {"level": "info", "message": "something happened"}, + {"level": "error", "message": "something bad happened"}, + }, }, - }, - { - name: "missing_file", - config: map[string]interface{}{ - "interval": 1, - "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, - "resource": map[string]interface{}{ - "url": "testdata/absent.ndjson", + { + name: "ndjson_log_file_corrupted", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, + "resource": map[string]interface{}{ + "url": "testdata/corrupted-log-1.ndjson", + }, + }, + want: []map[string]interface{}{ + {"level": "info", "message": "something happened"}, + {"error.message": `unexpected end of JSON input: {"message":"Dave, stop. Stop, will you? Stop, Dave. Will you stop, Dave? Stop, Dave."`}, + {"level": "error", "message": "something bad happened"}, }, }, - want: []map[string]interface{}{ - {"file.error": "file: " + missingFileError("testdata/absent.ndjson")}, + { + name: "missing_file", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events": try(file(state.url, "application/x-ndjson").map(e, try(e, "error.message")), "file.error")}`, + "resource": map[string]interface{}{ + "url": "testdata/absent.ndjson", + }, + }, + want: []map[string]interface{}{ + {"file.error": "file: " + missingFileError("testdata/absent.ndjson")}, + }, }, - }, - // HTTP-based tests. - { - name: "GET_request", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -bytes(get(state.url).Body).as(body, { - "events": [body.decode_json()] -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "hello": []interface{}{ - map[string]interface{}{ - "world": "moon", - }, - map[string]interface{}{ - "space": []interface{}{ - map[string]interface{}{ - "cake": "pumpkin", + // HTTP-based tests. + { + name: "GET_request", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + bytes(get(state.url).Body).as(body, { + "events": [body.decode_json()] + }) + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "hello": []interface{}{ + map[string]interface{}{ + "world": "moon", + }, + map[string]interface{}{ + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, }, }, }, }, }, }, - }, - { - name: "GET_request_TLS", - server: newTestServer(httptest.NewTLSServer), - config: map[string]interface{}{ - "interval": 1, - "resource.ssl.verification_mode": "none", - "program": ` -bytes(get(state.url).Body).as(body, { - "events": [body.decode_json()] -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "hello": []interface{}{ - map[string]interface{}{ - "world": "moon", - }, - map[string]interface{}{ - "space": []interface{}{ - map[string]interface{}{ - "cake": "pumpkin", + { + name: "GET_request_TLS", + server: newTestServer(httptest.NewTLSServer), + config: map[string]interface{}{ + "interval": 1, + "resource.ssl.verification_mode": "none", + "program": ` + bytes(get(state.url).Body).as(body, { + "events": [body.decode_json()] + }) + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "hello": []interface{}{ + map[string]interface{}{ + "world": "moon", + }, + map[string]interface{}{ + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, }, }, }, }, }, }, - }, - { - name: "retry_after_request", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -get(state.url).as(resp, { - "url": state.url, - "events": [bytes(resp.Body).decode_json()], - "status_code": resp.StatusCode, - "header": resp.Header, -}) -`, - }, - handler: retryAfterHandler("1"), - want: []map[string]interface{}{ - {"hello": "world"}, - }, - }, - { - name: "retry_after_request_time", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -get(state.url).as(resp, { - "url": state.url, - "events": [bytes(resp.Body).decode_json()], - "status_code": resp.StatusCode, - "header": resp.Header, -}) -`, - }, - handler: retryAfterHandler(time.Now().Add(time.Second).UTC().Format(http.TimeFormat)), - want: []map[string]interface{}{ - {"hello": "world"}, - }, - }, - { - name: "rate_limit_request_0", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -get(state.url).as(resp, { - "url": state.url, - "events": [bytes(resp.Body).decode_json()], - "status_code": resp.StatusCode, - "header": resp.Header, - "rate_limit": rate_limit(resp.Header, 'okta', duration('1m')), -}) -`, - }, - handler: rateLimitHandler("0", 100*time.Millisecond), - want: []map[string]interface{}{ - {"hello": "world"}, + { + name: "retry_after_request", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + get(state.url).as(resp, { + "url": state.url, + "events": [bytes(resp.Body).decode_json()], + "status_code": resp.StatusCode, + "header": resp.Header, + }) + `, + }, + handler: retryAfterHandler("1"), + want: []map[string]interface{}{ + {"hello": "world"}, + }, }, - }, - { - name: "rate_limit_request_10", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -get(state.url).as(resp, { - "url": state.url, - "events": [bytes(resp.Body).decode_json()], - "status_code": resp.StatusCode, - "header": resp.Header, - "rate_limit": rate_limit(resp.Header, 'okta', duration('1m')), -}) -`, + { + name: "retry_after_request_time", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + get(state.url).as(resp, { + "url": state.url, + "events": [bytes(resp.Body).decode_json()], + "status_code": resp.StatusCode, + "header": resp.Header, + }) + `, + }, + handler: retryAfterHandler(time.Now().Add(time.Second).UTC().Format(http.TimeFormat)), + want: []map[string]interface{}{ + {"hello": "world"}, + }, }, - handler: rateLimitHandler("10", 100*time.Millisecond), - want: []map[string]interface{}{ - {"hello": "world"}, + { + name: "rate_limit_request_0", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + get(state.url).as(resp, { + "url": state.url, + "events": [bytes(resp.Body).decode_json()], + "status_code": resp.StatusCode, + "header": resp.Header, + "rate_limit": rate_limit(resp.Header, 'okta', duration('1m')), + }) + `, + }, + handler: rateLimitHandler("0", 100*time.Millisecond), + want: []map[string]interface{}{ + {"hello": "world"}, + }, }, - }, - { - name: "rate_limit_request_10_too_slow", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -get(state.url).as(resp, { - "url": state.url, - "events": [bytes(resp.Body).decode_json()], - "status_code": resp.StatusCode, - "header": resp.Header, - "rate_limit": rate_limit(resp.Header, 'okta', duration('1m')), -}) -`, + { + name: "rate_limit_request_10", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + get(state.url).as(resp, { + "url": state.url, + "events": [bytes(resp.Body).decode_json()], + "status_code": resp.StatusCode, + "header": resp.Header, + "rate_limit": rate_limit(resp.Header, 'okta', duration('1m')), + }) + `, + }, + handler: rateLimitHandler("10", 100*time.Millisecond), + want: []map[string]interface{}{ + {"hello": "world"}, + }, }, - handler: rateLimitHandler("10", 10*time.Second), - want: []map[string]interface{}{}, - }, - { - name: "retry_failure", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -get(state.url).as(resp, { - "url": state.url, - "events": [bytes(resp.Body).decode_json()], - "status_code": resp.StatusCode, - "header": resp.Header, -}) -`, + { + name: "rate_limit_request_10_too_slow", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + get(state.url).as(resp, { + "url": state.url, + "events": [bytes(resp.Body).decode_json()], + "status_code": resp.StatusCode, + "header": resp.Header, + "rate_limit": rate_limit(resp.Header, 'okta', duration('1m')), + }) + `, + }, + handler: rateLimitHandler("10", 10*time.Second), + want: []map[string]interface{}{}, }, - handler: retryHandler(), - want: []map[string]interface{}{ - {"hello": "world"}, + { + name: "retry_failure", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + get(state.url).as(resp, { + "url": state.url, + "events": [bytes(resp.Body).decode_json()], + "status_code": resp.StatusCode, + "header": resp.Header, + }) + `, + }, + handler: retryHandler(), + want: []map[string]interface{}{ + {"hello": "world"}, + }, }, - }, - { - name: "POST_request", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -bytes(post(state.url, "application/json", '{"test":"abc"}').Body).as(body, { - "url": state.url, - "events": [body.decode_json()] -}) -`, - }, - handler: defaultHandler(http.MethodPost, `{"test":"abc"}`), - want: []map[string]interface{}{ - { - "hello": []interface{}{ - map[string]interface{}{ - "world": "moon", - }, - map[string]interface{}{ - "space": []interface{}{ - map[string]interface{}{ - "cake": "pumpkin", + { + name: "POST_request", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + bytes(post(state.url, "application/json", '{"test":"abc"}').Body).as(body, { + "url": state.url, + "events": [body.decode_json()] + }) + `, + }, + handler: defaultHandler(http.MethodPost, `{"test":"abc"}`), + want: []map[string]interface{}{ + { + "hello": []interface{}{ + map[string]interface{}{ + "world": "moon", + }, + map[string]interface{}{ + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, }, }, }, }, }, }, - }, - { - name: "repeated_POST_request", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": "100ms", - "program": ` -bytes(post(state.url, "application/json", '{"test":"abc"}').Body).as(body, { - "url": state.url, - "events": [body.decode_json()] -}) -`, - }, - handler: defaultHandler(http.MethodPost, `{"test":"abc"}`), - want: []map[string]interface{}{ - { - "hello": []interface{}{ - map[string]interface{}{ - "world": "moon", + { + name: "repeated_POST_request", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": "100ms", + "program": ` + bytes(post(state.url, "application/json", '{"test":"abc"}').Body).as(body, { + "url": state.url, + "events": [body.decode_json()] + }) + `, + }, + handler: defaultHandler(http.MethodPost, `{"test":"abc"}`), + want: []map[string]interface{}{ + { + "hello": []interface{}{ + map[string]interface{}{ + "world": "moon", + }, + map[string]interface{}{ + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, + }, + }, }, - map[string]interface{}{ - "space": []interface{}{ - map[string]interface{}{ - "cake": "pumpkin", + }, + { + "hello": []interface{}{ + map[string]interface{}{ + "world": "moon", + }, + map[string]interface{}{ + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, }, }, }, }, }, - { - "hello": []interface{}{ - map[string]interface{}{ + }, + { + name: "split_events", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + bytes(get(state.url).Body).as(body, { + "events": body.decode_json().hello + }) + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "world": "moon", + }, + { + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, + }, + }, + }, + }, + { + name: "split_events_keep_parent", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + bytes(get(state.url).Body).as(body, { + "events": body.decode_json().hello.map(e, + { + "hello": e + }) + }) + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "hello": map[string]interface{}{ "world": "moon", }, - map[string]interface{}{ + }, + { + "hello": map[string]interface{}{ "space": []interface{}{ map[string]interface{}{ "cake": "pumpkin", @@ -628,410 +689,398 @@ bytes(post(state.url, "application/json", '{"test":"abc"}').Body).as(body, { }, }, }, - }, - { - name: "split_events", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -bytes(get(state.url).Body).as(body, { - "events": body.decode_json().hello -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "world": "moon", + { + name: "nested_split_events", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + bytes(get(state.url).Body).decode_json().as(e0, { + "events": e0.hello.map(e1, has(e1.space) ? + e1.space.map(e2, { + "space": e2, + }) + : + [e1] // Make sure the two conditions are the same shape. + ).flatten() + }) + `, }, - { - "space": []interface{}{ - map[string]interface{}{ + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "world": "moon", + }, + { + "space": map[string]interface{}{ "cake": "pumpkin", }, }, }, }, - }, - { - name: "split_events_keep_parent", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -bytes(get(state.url).Body).as(body, { - "events": body.decode_json().hello.map(e, - { - "hello": e + { + name: "absent_split", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + bytes(get(state.url).Body).decode_json().as(e, { + "url": state.url, + "events": has(e.unknown) ? + e.unknown.map(u, { + "unknown": u, + }) + : + [] }) -}) -`, + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}(nil), }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "hello": map[string]interface{}{ - "world": "moon", + + // Cursor/pagination tests. + { + name: "date_cursor", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "state": map[string]interface{}{ + "fake_now": "2002-10-02T15:00:00Z", }, + "program": ` + // Use terse non-standard check for presence of timestamp. The standard + // alternative is to use has(state.cursor) && has(state.cursor.timestamp). + (!is_error(state.cursor.timestamp) ? + state.cursor.timestamp + : + timestamp(state.fake_now)-duration('10m') + ).as(time_cursor, + string(state.url).parse_url().with_replace({ + "RawQuery": {"$filter": ["alertCreationTime ge "+string(time_cursor)]}.format_query() + }).format_url().as(url, bytes(get(url).Body)).decode_json().as(event, { + "events": [event], + // Get the timestamp from the event if it exists, otherwise advance a little to break a request loop. + // Due to the name of the @timestamp field, we can't use has(), so use is_error(). + "cursor": [{"timestamp": !is_error(event["@timestamp"]) ? event["@timestamp"] : time_cursor+duration('1s')}], + + // Just for testing, cycle this back into the next state. + "fake_now": state.fake_now + })) + `, }, - { - "hello": map[string]interface{}{ - "space": []interface{}{ - map[string]interface{}{ - "cake": "pumpkin", - }, - }, - }, + handler: dateCursorHandler(), + want: []map[string]interface{}{ + {"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"}, + {"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"}, + {"@timestamp": "2002-10-02T15:00:02Z", "foo": "bar"}, + }, + wantCursor: []map[string]interface{}{ + {"timestamp": "2002-10-02T15:00:00Z"}, + {"timestamp": "2002-10-02T15:00:01Z"}, + {"timestamp": "2002-10-02T15:00:02Z"}, }, }, - }, - { - name: "nested_split_events", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -bytes(get(state.url).Body).decode_json().as(e0, { - "events": e0.hello.map(e1, has(e1.space) ? - e1.space.map(e2, { - "space": e2, - }) - : - [e1] // Make sure the two conditions are the same shape. - ).flatten() -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "world": "moon", + { + name: "tracer_filename_sanitization", + server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + server := httptest.NewServer(h) + config["resource.url"] = server.URL + t.Cleanup(server.Close) }, - { - "space": map[string]interface{}{ - "cake": "pumpkin", + config: map[string]interface{}{ + "interval": 1, + "resource.tracer.filename": filepath.Join(tempDirectory, "logs", "http-request-trace-*.ndjson"), + "state": map[string]interface{}{ + "fake_now": "2002-10-02T15:00:00Z", }, + "program": ` + // Use terse non-standard check for presence of timestamp. The standard + // alternative is to use has(state.cursor) && has(state.cursor.timestamp). + (!is_error(state.cursor.timestamp) ? + state.cursor.timestamp + : + timestamp(state.fake_now)-duration('10m') + ).as(time_cursor, + string(state.url).parse_url().with_replace({ + "RawQuery": {"$filter": ["alertCreationTime ge "+string(time_cursor)]}.format_query() + }).format_url().as(url, bytes(get(url).Body)).decode_json().as(event, { + "events": [event], + // Get the timestamp from the event if it exists, otherwise advance a little to break a request loop. + // Due to the name of the @timestamp field, we can't use has(), so use is_error(). + "cursor": [{"timestamp": !is_error(event["@timestamp"]) ? event["@timestamp"] : time_cursor+duration('1s')}], + + // Just for testing, cycle this back into the next state. + "fake_now": state.fake_now + })) + `, }, + handler: dateCursorHandler(), + want: []map[string]interface{}{ + {"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"}, + {"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"}, + {"@timestamp": "2002-10-02T15:00:02Z", "foo": "bar"}, + }, + wantCursor: []map[string]interface{}{ + {"timestamp": "2002-10-02T15:00:00Z"}, + {"timestamp": "2002-10-02T15:00:01Z"}, + {"timestamp": "2002-10-02T15:00:02Z"}, + }, + wantFile: filepath.Join("logs", "http-request-trace-test_id_tracer_filename_sanitization.ndjson"), }, - }, - { - name: "absent_split", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -bytes(get(state.url).Body).decode_json().as(e, { - "url": state.url, - "events": has(e.unknown) ? - e.unknown.map(u, { - "unknown": u, - }) + { + name: "pagination_cursor_object", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + (!is_error(state.cursor.page) ? + state.cursor.page : - [] -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}(nil), - }, - - // Cursor/pagination tests. - { - name: "date_cursor", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "state": map[string]interface{}{ - "fake_now": "2002-10-02T15:00:00Z", - }, - "program": ` -// Use terse non-standard check for presence of timestamp. The standard -// alternative is to use has(state.cursor) && has(state.cursor.timestamp). -(!is_error(state.cursor.timestamp) ? - state.cursor.timestamp -: - timestamp(state.fake_now)-duration('10m') -).as(time_cursor, -string(state.url).parse_url().with_replace({ - "RawQuery": {"$filter": ["alertCreationTime ge "+string(time_cursor)]}.format_query() -}).format_url().as(url, bytes(get(url).Body)).decode_json().as(event, { - "events": [event], - // Get the timestamp from the event if it exists, otherwise advance a little to break a request loop. - // Due to the name of the @timestamp field, we can't use has(), so use is_error(). - "cursor": [{"timestamp": !is_error(event["@timestamp"]) ? event["@timestamp"] : time_cursor+duration('1s')}], - - // Just for testing, cycle this back into the next state. - "fake_now": state.fake_now -})) -`, - }, - handler: dateCursorHandler(), - want: []map[string]interface{}{ - {"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"}, - {"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"}, - {"@timestamp": "2002-10-02T15:00:02Z", "foo": "bar"}, - }, - wantCursor: []map[string]interface{}{ - {"timestamp": "2002-10-02T15:00:00Z"}, - {"timestamp": "2002-10-02T15:00:01Z"}, - {"timestamp": "2002-10-02T15:00:02Z"}, - }, - }, - { - name: "pagination_cursor_object", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -(!is_error(state.cursor.page) ? - state.cursor.page -: - "" -).as(page_cursor, -string(state.url).parse_url().with_replace({ - "RawQuery": (page_cursor != "" ? {"page": [page_cursor]}.format_query() : "") -}).format_url().as(url, bytes(get(url).Body)).decode_json().as(resp, { - "events": resp.items, - "cursor": (has(resp.nextPageToken) ? resp.nextPageToken : "").as(page, {"page": page}), -})) -`, - }, - handler: paginationHandler(), - want: []map[string]interface{}{ - {"foo": "a"}, - {"foo": "b"}, - }, - wantCursor: []map[string]interface{}{ - {"page": "bar"}, - {"page": ""}, - }, - }, - { - name: "pagination_cursor_array", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -(!is_error(state.cursor.page) ? - state.cursor.page -: - "" -).as(page_cursor, -string(state.url).parse_url().with_replace({ - "RawQuery": (page_cursor != "" ? {"page": [page_cursor]}.format_query() : "") -}).format_url().as(url, bytes(get(url).Body)).decode_json().as(resp, { - "events": resp.items, - - // The use of map here is to ensure the cursor is size-matched with the - // events. In the test case all the items arrays are size 1, but this - // may not be the case. In any case, calculate the page token only once. - "cursor": (has(resp.nextPageToken) ? resp.nextPageToken : "").as(page, resp.items.map(e, {"page": page})), -})) -`, - }, - handler: paginationHandler(), - want: []map[string]interface{}{ - {"foo": "a"}, - {"foo": "b"}, - }, - wantCursor: []map[string]interface{}{ - {"page": "bar"}, - {"page": ""}, - }, - }, - { - // This doesn't match the behaviour of the equivalent test in httpjson ("Test first - // event"), but I am not entirely sure what the basis of that behaviour is. - // In particular the transition {"first":"a", "foo":"b"} => {"first":"a", "foo":"c"} - // retaining identity in "first" doesn't follow a logic that I understand. - name: "first_event_cursor", - server: newTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -(!is_error(state.cursor.page) ? - state.cursor.page -: - "" -).as(page_cursor, -string(state.url).parse_url().with_replace({ - "RawQuery": (page_cursor != "" ? {"page": [page_cursor]}.format_query() : "") -}).format_url().as(url, bytes(get(url).Body)).decode_json().as(resp, { - "events": resp.items.map(e, e.with_update({ - "first": (!is_error(state.cursor.first) ? state.cursor.first : "none"), - })), - "cursor": (has(resp.nextPageToken) ? resp.nextPageToken : "").as(page, resp.items.map(e, { - "page": page, - "first": e.foo, - })), -})) -`, + "" + ).as(page_cursor, + string(state.url).parse_url().with_replace({ + "RawQuery": (page_cursor != "" ? {"page": [page_cursor]}.format_query() : "") + }).format_url().as(url, bytes(get(url).Body)).decode_json().as(resp, { + "events": resp.items, + "cursor": (has(resp.nextPageToken) ? resp.nextPageToken : "").as(page, {"page": page}), + })) + `, + }, + handler: paginationHandler(), + want: []map[string]interface{}{ + {"foo": "a"}, + {"foo": "b"}, + }, + wantCursor: []map[string]interface{}{ + {"page": "bar"}, + {"page": ""}, + }, }, - handler: paginationHandler(), - want: []map[string]interface{}{ - {"first": "none", "foo": "a"}, - {"first": "a", "foo": "b"}, - {"first": "b", "foo": "c"}, - {"first": "c", "foo": "d"}, + { + name: "pagination_cursor_array", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + (!is_error(state.cursor.page) ? + state.cursor.page + : + "" + ).as(page_cursor, + string(state.url).parse_url().with_replace({ + "RawQuery": (page_cursor != "" ? {"page": [page_cursor]}.format_query() : "") + }).format_url().as(url, bytes(get(url).Body)).decode_json().as(resp, { + "events": resp.items, + + // The use of map here is to ensure the cursor is size-matched with the + // events. In the test case all the items arrays are size 1, but this + // may not be the case. In any case, calculate the page token only once. + "cursor": (has(resp.nextPageToken) ? resp.nextPageToken : "").as(page, resp.items.map(e, {"page": page})), + })) + `, + }, + handler: paginationHandler(), + want: []map[string]interface{}{ + {"foo": "a"}, + {"foo": "b"}, + }, + wantCursor: []map[string]interface{}{ + {"page": "bar"}, + {"page": ""}, + }, }, - wantCursor: []map[string]interface{}{ - {"first": "a", "page": "bar"}, - {"first": "b", "page": ""}, - {"first": "c", "page": ""}, - {"first": "d", "page": ""}, + { + // This doesn't match the behaviour of the equivalent test in httpjson ("Test first + // event"), but I am not entirely sure what the basis of that behaviour is. + // In particular the transition {"first":"a", "foo":"b"} => {"first":"a", "foo":"c"} + // retaining identity in "first" doesn't follow a logic that I understand. + name: "first_event_cursor", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + (!is_error(state.cursor.page) ? + state.cursor.page + : + "" + ).as(page_cursor, + string(state.url).parse_url().with_replace({ + "RawQuery": (page_cursor != "" ? {"page": [page_cursor]}.format_query() : "") + }).format_url().as(url, bytes(get(url).Body)).decode_json().as(resp, { + "events": resp.items.map(e, e.with_update({ + "first": (!is_error(state.cursor.first) ? state.cursor.first : "none"), + })), + "cursor": (has(resp.nextPageToken) ? resp.nextPageToken : "").as(page, resp.items.map(e, { + "page": page, + "first": e.foo, + })), + })) + `, + }, + handler: paginationHandler(), + want: []map[string]interface{}{ + {"first": "none", "foo": "a"}, + {"first": "a", "foo": "b"}, + {"first": "b", "foo": "c"}, + {"first": "c", "foo": "d"}, + }, + wantCursor: []map[string]interface{}{ + {"first": "a", "page": "bar"}, + {"first": "b", "page": ""}, + {"first": "c", "page": ""}, + {"first": "d", "page": ""}, + }, }, - }, - // Authenticated access tests. - { - name: "OAuth2", - server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { - s := httptest.NewServer(h) - config["resource.url"] = s.URL - config["auth.oauth2.token_url"] = s.URL + "/token" - t.Cleanup(s.Close) - }, - config: map[string]interface{}{ - "interval": 1, - "auth.oauth2.client.id": "a_client_id", - "auth.oauth2.client.secret": "a_client_secret", - "auth.oauth2.endpoint_params": map[string]interface{}{ - "param1": "v1", - }, - "auth.oauth2.scopes": []string{"scope1", "scope2"}, - "program": ` -bytes(post(state.url, '', '').Body).as(body, { - "events": body.decode_json() -}) -`, - }, - handler: oauth2Handler, - want: []map[string]interface{}{ - {"hello": "world"}, + // Authenticated access tests. + { + name: "OAuth2", + server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + s := httptest.NewServer(h) + config["resource.url"] = s.URL + config["auth.oauth2.token_url"] = s.URL + "/token" + t.Cleanup(s.Close) + }, + config: map[string]interface{}{ + "interval": 1, + "auth.oauth2.client.id": "a_client_id", + "auth.oauth2.client.secret": "a_client_secret", + "auth.oauth2.endpoint_params": map[string]interface{}{ + "param1": "v1", + }, + "auth.oauth2.scopes": []string{"scope1", "scope2"}, + "program": ` + bytes(post(state.url, '', '').Body).as(body, { + "events": body.decode_json() + }) + `, + }, + handler: oauth2Handler, + want: []map[string]interface{}{ + {"hello": "world"}, + }, }, - }, - // Multi-step requests. - { - name: "simple_multistep_GET_request", - server: newChainTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -// Get the record IDs. -bytes(get(state.url).Body).decode_json().records.map(r, - // Get each event by its ID. - bytes(get(state.url+'/'+string(r.id)).Body).decode_json()).as(events, { - "events": events, -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "hello": []interface{}{ - map[string]interface{}{ - "world": "moon", - }, - map[string]interface{}{ - "space": []interface{}{ - map[string]interface{}{ - "cake": "pumpkin", + // Multi-step requests. + { + name: "simple_multistep_GET_request", + server: newChainTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + // Get the record IDs. + bytes(get(state.url).Body).decode_json().records.map(r, + // Get each event by its ID. + bytes(get(state.url+'/'+string(r.id)).Body).decode_json()).as(events, { + "events": events, + }) + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "hello": []interface{}{ + map[string]interface{}{ + "world": "moon", + }, + map[string]interface{}{ + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, }, }, }, }, }, }, - }, - { - name: "three_step_GET_request", - server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { - r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/": - fmt.Fprintln(w, `{"records":[{"id":1}]}`) - case "/1": - fmt.Fprintln(w, `{"file_name": "file_1"}`) - case "/file_1": - fmt.Fprintln(w, `{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`) - } - }) - server := httptest.NewServer(r) - config["resource.url"] = server.URL - t.Cleanup(server.Close) - }, - config: map[string]interface{}{ - "interval": 1, - "program": ` -// Get the record IDs. -bytes(get(state.url).Body).decode_json().records.map(r, - // Get the set of all files from the set of IDs. - bytes(get(state.url+'/'+string(r.id)).Body).decode_json()).map(f, - // Collate all the files into the events list. - bytes(get(state.url+'/'+f.file_name).Body).decode_json()).as(events, { - "events": events, -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "hello": []interface{}{ - map[string]interface{}{ - "world": "moon", - }, - map[string]interface{}{ - "space": []interface{}{ - map[string]interface{}{ - "cake": "pumpkin", + { + name: "three_step_GET_request", + server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintln(w, `{"records":[{"id":1}]}`) + case "/1": + fmt.Fprintln(w, `{"file_name": "file_1"}`) + case "/file_1": + fmt.Fprintln(w, `{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`) + } + }) + server := httptest.NewServer(r) + config["resource.url"] = server.URL + t.Cleanup(server.Close) + }, + config: map[string]interface{}{ + "interval": 1, + "program": ` + // Get the record IDs. + bytes(get(state.url).Body).decode_json().records.map(r, + // Get the set of all files from the set of IDs. + bytes(get(state.url+'/'+string(r.id)).Body).decode_json()).map(f, + // Collate all the files into the events list. + bytes(get(state.url+'/'+f.file_name).Body).decode_json()).as(events, { + "events": events, + }) + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "hello": []interface{}{ + map[string]interface{}{ + "world": "moon", + }, + map[string]interface{}{ + "space": []interface{}{ + map[string]interface{}{ + "cake": "pumpkin", + }, }, }, }, }, }, }, - }, - // Programmer error. - { - name: "type_error_message", - server: newChainTestServer(httptest.NewServer), - config: map[string]interface{}{ - "interval": 1, - "program": ` -bytes(get(state.url).Body).decode_json().records.map(r, - bytes(get(state.url+'/'+r.id).Body).decode_json()).as(events, { -// ^~~~ r.id not converted to string: can't add integer to string. - "events": events, -}) -`, - }, - handler: defaultHandler(http.MethodGet, ""), - want: []map[string]interface{}{ - { - "error": map[string]interface{}{ - "message": "failed eval: no such overload", // This is the best we get for some errors from CEL. + // Programmer error. + { + name: "type_error_message", + server: newChainTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "program": ` + bytes(get(state.url).Body).decode_json().records.map(r, + bytes(get(state.url+'/'+r.id).Body).decode_json()).as(events, { + // ^~~~ r.id not converted to string: can't add integer to string. + "events": events, + }) + `, + }, + handler: defaultHandler(http.MethodGet, ""), + want: []map[string]interface{}{ + { + "error": map[string]interface{}{ + "message": "failed eval: no such overload", // This is the best we get for some errors from CEL. + }, }, }, }, - }, - // not yet done from httpjson (some are redundant since they are compositional products). - // - // cursor/pagination (place above auth test block) - // Test pagination with array response - // Test request transforms can access state from previous transforms - // Test response transforms can't access request state from previous transforms - // more chain tests (place after other chain tests) - // Test date cursor while using chain - // Test split by json objects array in chain - // Test split by json objects array with keep parent in chain - // Test nested split in chain -} + // not yet done from httpjson (some are redundant since they are compositional products). + // + // cursor/pagination (place above auth test block) + // Test pagination with array response + // Test request transforms can access state from previous transforms + // Test response transforms can't access request state from previous transforms + // more chain tests (place after other chain tests) + // Test date cursor while using chain + // Test split by json objects array in chain + // Test split by json objects array with keep parent in chain + // Test nested split in chain + } -func TestInput(t *testing.T) { skipOnWindows := map[string]string{ "ndjson_log_file_simple_file_scheme": "Path handling on Windows is incompatible with url.Parse/url.URL.String. See go.dev/issue/6027.", } @@ -1112,6 +1161,11 @@ func TestInput(t *testing.T) { t.Errorf("unexpected cursor for event %d: got:- want:+\n%s", i, cmp.Diff(got, test.wantCursor[i])) } } + if len(test.wantFile) != 0 { + if _, err := os.Stat(filepath.Join(tempDirectory, test.wantFile)); err != nil { + t.Errorf("Expected log filename not found") + } + } }) } } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 6e1d3e8ca36f..fc709c0b70f0 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -161,10 +161,9 @@ func run( return nil } -// The Request.Tracer.Filename may have ":" when a httpjson input has cursor config -// The MacOs Finder will treat this as path-separator and causes to show up strange filepaths. -// This function will sanitize characters like ":" and "/" to replace them with "_" just to be -// safe on all operating systems. +// sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. +// The request.tracer.filename may have ":" when a httpjson input has cursor config and +// the macOS Finder will treat this as path-separator and causes to show up strange filepaths. func sanitizeFileName(name string) string { name = strings.ReplaceAll(name, ":", string(filepath.Separator)) name = filepath.Clean(name) @@ -180,6 +179,11 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC if config.Request.Tracer != nil { w := zapcore.AddSync(config.Request.Tracer) + go func() { + // Close the logger when we are done. + <-ctx.Done() + config.Request.Tracer.Close() + }() core := ecszap.NewCore( ecszap.NewDefaultEncoderConfig(), w, diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index 3a4b04050ff5..f3c159b08044 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -12,6 +12,7 @@ import ( "net/http" "net/http/httptest" "os" + "path/filepath" "testing" "time" @@ -25,12 +26,14 @@ import ( ) func TestInput(t *testing.T) { + tempDirectory := t.TempDir() testCases := []struct { - name string - setupServer func(*testing.T, http.HandlerFunc, map[string]interface{}) - baseConfig map[string]interface{} - handler http.HandlerFunc - expected []string + name string + setupServer func(*testing.T, http.HandlerFunc, map[string]interface{}) + baseConfig map[string]interface{} + handler http.HandlerFunc + expected []string + expectedFile string }{ { name: "Test simple GET request", @@ -294,7 +297,7 @@ func TestInput(t *testing.T) { }, }, { - name: "Test filename truncation", + name: "Test tracer filename sanitization", setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { registerRequestTransforms() t.Cleanup(func() { registeredTransforms = newRegistry() }) @@ -326,8 +329,7 @@ func TestInput(t *testing.T) { "value": `[[index .last_response.body "@timestamp"]]`, }, }, - "request.tracer.filename": "../../logs/httpjson/http-request-trace-*.ndjson", - "verifyfilepath": true, + "request.tracer.filename": filepath.Join(tempDirectory, "logs", "http-request-trace-*.ndjson"), }, handler: dateCursorHandler(), expected: []string{ @@ -335,6 +337,7 @@ func TestInput(t *testing.T) { `{"@timestamp":"2002-10-02T15:00:01Z","foo":"bar"}`, `{"@timestamp":"2002-10-02T15:00:02Z","foo":"bar"}`, }, + expectedFile: filepath.Join("logs", "http-request-trace-httpjson-foo-eb837d4c-5ced-45ed-b05c-de658135e248_https_somesource_someapi.ndjson"), }, { name: "Test pagination", @@ -1235,8 +1238,8 @@ func TestInput(t *testing.T) { } } } - if tc.baseConfig["verifyfilepath"] != nil { - if _, err := os.Stat("../../logs/httpjson/http-request-trace-httpjson-foo-eb837d4c-5ced-45ed-b05c-de658135e248_https_somesource_someapi.ndjson"); err == nil { + if len(tc.expectedFile) != 0 { + if _, err := os.Stat(filepath.Join(tempDirectory, tc.expectedFile)); err == nil { assert.NoError(t, g.Wait()) } else { t.Errorf("Expected log filename not found")