diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b061eb263153..fec591d88196 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -286,6 +286,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036] - Add support for collecting IPv6 metrics. {pull}35123[35123] - Add oracle authentication messages parsing {pull}35127[35127] +- Add sanitization capabilities to azure-eventhub input {pull}34874[34874] - Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204] - Add execution budget to CEL input. {pull}35409[35409] - Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235] diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index 3bd90d677380..80c2a905162c 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -25,6 +25,8 @@ type azureInputConfig struct { SAContainer string `config:"storage_account_container"` // by default the azure public environment is used, to override, users can provide a specific resource manager endpoint OverrideEnvironment string `config:"resource_manager_endpoint"` + // cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES + SanitizeOptions []string `config:"sanitize_options"` } const ephContainerName = "filebeat" @@ -63,6 +65,14 @@ func (conf *azureInputConfig) Validate() error { return err } + // log a warning for each sanitization option not supported + for _, opt := range conf.SanitizeOptions { + err := sanitizeOptionsValidate(opt) + if err != nil { + logger.Warnf("%s: %v", opt, err) + } + } + return nil } diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 4d648f69a122..6254e7698b16 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -166,6 +166,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} var messages []string + + // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. + // Sanitization occurs if options are available and the message contains an invalid JSON. + // + // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps + if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { + bMessage = sanitize(bMessage, a.config.SanitizeOptions...) + } + // check if the message is a "records" object containing a list of events err := json.Unmarshal(bMessage, &mapObject) if err == nil { diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go new file mode 100644 index 000000000000..537d29951c9f --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix +// +build !aix + +package azureeventhub + +import ( + "bytes" + "errors" +) + +type sanitizationOption string + +const ( + newLines sanitizationOption = "NEW_LINES" + singleQuotes sanitizationOption = "SINGLE_QUOTES" +) + +// sanitizeOptionsValidate validates for supported sanitization options +func sanitizeOptionsValidate(s string) error { + switch s { + case "NEW_LINES": + return nil + case "SINGLE_QUOTES": + return nil + default: + return errors.New("invalid sanitization option") + } +} + +// sanitize applies the sanitization options specified in the config +// if no sanitization options are provided, the message remains unchanged +func sanitize(jsonByte []byte, opts ...string) []byte { + res := jsonByte + + for _, opt := range opts { + switch sanitizationOption(opt) { + case newLines: + res = sanitizeNewLines(res) + case singleQuotes: + res = sanitizeSingleQuotes(res) + } + } + + return res +} + +// sanitizeNewLines removes newlines found in the message +func sanitizeNewLines(jsonByte []byte) []byte { + return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{}) +} + +// sanitizeSingleQuotes replaces single quotes with double quotes in the message +// single quotes that are in between double quotes remain unchanged +func sanitizeSingleQuotes(jsonByte []byte) []byte { + var result bytes.Buffer + var prevChar byte + + inDoubleQuotes := false + + for _, r := range jsonByte { + if r == '"' && prevChar != '\\' { + inDoubleQuotes = !inDoubleQuotes + } + + if r == '\'' && !inDoubleQuotes { + result.WriteRune('"') + } else { + result.WriteByte(r) + } + prevChar = r + } + + return result.Bytes() +} diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go new file mode 100644 index 000000000000..cee113f73857 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -0,0 +1,82 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix +// +build !aix + +package azureeventhub + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestParseMultipleMessagesSanitization(t *testing.T) { + msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", + } + + input := azureInput{ + log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + config: azureInputConfig{ + SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"}, + }, + } + + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } +} + +func TestSanitize(t *testing.T) { + jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}") + + testCases := []struct { + name string + opts []string + expected []byte + }{ + { + name: "no options", + opts: []string{}, + expected: jsonByte, + }, + { + name: "NEW_LINES option", + opts: []string{"NEW_LINES"}, + expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "SINGLE_QUOTES option", + opts: []string{"SINGLE_QUOTES"}, + expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "both options", + opts: []string{"NEW_LINES", "SINGLE_QUOTES"}, + expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + } + + // Run test cases + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + res := sanitize(jsonByte, tc.opts...) + assert.Equal(t, tc.expected, res) + }) + } +}