From bd610eceb6a630585f6bf09fbb64fc4687ee877e Mon Sep 17 00:00:00 2001 From: lucianpy Date: Tue, 21 Mar 2023 17:38:15 +0200 Subject: [PATCH 01/18] Add sanitization function and test for azure input --- x-pack/filebeat/input/azureeventhub/config.go | 2 ++ x-pack/filebeat/input/azureeventhub/input.go | 26 +++++++++++++++++++ .../input/azureeventhub/input_test.go | 20 ++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index 316b00c75b5..4fd106c9b7e 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -26,6 +26,8 @@ type azureInputConfig struct { SAContainer string `config:"storage_account_container"` // by default the azure public environment is used, to override, users can provide a specific resource manager endpoint OverrideEnvironment string `config:"resource_manager_endpoint"` + // option to cleanup the input for known issues which might produce a malformed JSON + SanitizeMessage bool `config:"sanitize_message"` } const ephContainerName = "filebeat" diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 9c3a0d5980a..7fad6b1c243 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -163,10 +163,36 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo return true } +// sanitizeMessage will replace out of place newlines and single quotes +func sanitizeMessage(jsonStr string) []byte { + var result strings.Builder + inDoubleQuotes := false + + for _, r := range jsonStr { + if r == '"' { + inDoubleQuotes = !inDoubleQuotes + } + if r == '\n' && !inDoubleQuotes { + continue + } + if r == '\'' && !inDoubleQuotes { + result.WriteRune('"') + } else { + result.WriteRune(r) + } + } + + return []byte(result.String()) +} + // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} var messages []string + // clean up the message for known issues producing a malformed JSON + if a.config.SanitizeMessage { + bMessage = sanitizeMessage(string(bMessage)) + } // check if the message is a "records" object containing a list of events err := json.Unmarshal(bMessage, &mapObject) if err == nil { diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index d13b2889de9..a3c42331db1 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -180,3 +180,23 @@ func (o *stubOutleter) OnEvent(event beat.Event) bool { o.cond.Broadcast() return o.done } + +func TestSanitizeMessage(t *testing.T) { + msg := "{\"records\":[{'test':\"this is some message\", \n \"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '3rd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '3rd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + } + + input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} + input.config.SanitizeMessage = true + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } +} From c97794cedf153998b4a503d0aeb0064ea2468e35 Mon Sep 17 00:00:00 2001 From: Lucian Date: Thu, 13 Apr 2023 16:30:57 +0300 Subject: [PATCH 02/18] Add testcase for FunctionAppLogs --- x-pack/filebeat/input/azureeventhub/input_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index a3c42331db1..2d339403bc8 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -184,18 +184,24 @@ func (o *stubOutleter) OnEvent(event beat.Event) bool { func TestSanitizeMessage(t *testing.T) { msg := "{\"records\":[{'test':\"this is some message\", \n \"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is '3rd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '3rd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" msgs := []string{ "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", "{\"test\":\"this is '3rd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", } input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} input.config.SanitizeMessage = true messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) - assert.Equal(t, len(messages), 3) + assert.Equal(t, len(messages), 5) + for _, ms := range messages { assert.Contains(t, msgs, ms) } From 5c918de976127afb4935ad00bee76182f53859fa Mon Sep 17 00:00:00 2001 From: Lucian Date: Wed, 19 Apr 2023 03:43:34 +0300 Subject: [PATCH 03/18] Refactor sanitization using modular approach --- x-pack/filebeat/input/azureeventhub/config.go | 5 +- x-pack/filebeat/input/azureeventhub/input.go | 28 +------ .../input/azureeventhub/input_test.go | 26 ------- .../input/azureeventhub/sanitization.go | 62 ++++++++++++++++ .../input/azureeventhub/sanitization_test.go | 74 +++++++++++++++++++ 5 files changed, 144 insertions(+), 51 deletions(-) create mode 100644 x-pack/filebeat/input/azureeventhub/sanitization.go create mode 100644 x-pack/filebeat/input/azureeventhub/sanitization_test.go diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index 4fd106c9b7e..ca081728064 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -27,7 +27,10 @@ type azureInputConfig struct { // by default the azure public environment is used, to override, users can provide a specific resource manager endpoint OverrideEnvironment string `config:"resource_manager_endpoint"` // option to cleanup the input for known issues which might produce a malformed JSON - SanitizeMessage bool `config:"sanitize_message"` + // SanitizeNewLineMessage bool `config:"sanitize_message"` + // SanitizeSingleQuotesMessage bool `config:"sanitize_message"` + SanitizeOptions []string `config:"sanitize_options"` + // NEW_LINE, SINGLE_QUOTE } const ephContainerName = "filebeat" diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 7fad6b1c243..38b304293cb 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -163,35 +163,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo return true } -// sanitizeMessage will replace out of place newlines and single quotes -func sanitizeMessage(jsonStr string) []byte { - var result strings.Builder - inDoubleQuotes := false - - for _, r := range jsonStr { - if r == '"' { - inDoubleQuotes = !inDoubleQuotes - } - if r == '\n' && !inDoubleQuotes { - continue - } - if r == '\'' && !inDoubleQuotes { - result.WriteRune('"') - } else { - result.WriteRune(r) - } - } - - return []byte(result.String()) -} - // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} var messages []string // clean up the message for known issues producing a malformed JSON - if a.config.SanitizeMessage { - bMessage = sanitizeMessage(string(bMessage)) + if a.config.SanitizeOptions != nil { + for _, opt := range a.config.SanitizeOptions { + bMessage = sanitize(string(bMessage), opt) + } } // check if the message is a "records" object containing a list of events err := json.Unmarshal(bMessage, &mapObject) diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index 2d339403bc8..d13b2889de9 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -180,29 +180,3 @@ func (o *stubOutleter) OnEvent(event beat.Event) bool { o.cond.Broadcast() return o.done } - -func TestSanitizeMessage(t *testing.T) { - msg := "{\"records\":[{'test':\"this is some message\", \n \"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is '3rd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" - msgs := []string{ - "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is '3rd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", - } - - input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} - input.config.SanitizeMessage = true - messages := input.parseMultipleMessages([]byte(msg)) - - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 5) - - for _, ms := range messages { - assert.Contains(t, msgs, ms) - } -} diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go new file mode 100644 index 00000000000..f9e03e8c8f8 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix +// +build !aix + +package azureeventhub + +import "strings" + +type sanitizationFunc func(jsonStr string) []byte + +func getSanitizationFuncs() map[string]sanitizationFunc { + return map[string]sanitizationFunc{ + "NEW_LINES": sanitizeNewLines, + "SINGLE_QUOTES": sanitizeSingleQuotes, + } +} + +func sanitize(jsonStr string, opts ...string) []byte { + var res []byte + + for _, opt := range opts { + f := getSanitizationFuncs()[opt] + res = f(jsonStr) + } + + return res +} + +func sanitizeNewLines(jsonStr string) []byte { + var result strings.Builder + + for _, r := range jsonStr { + if r == '\n' { + continue + } + result.WriteRune(r) + } + + return []byte(result.String()) +} + +func sanitizeSingleQuotes(jsonStr string) []byte { + var result strings.Builder + inDoubleQuotes := false + + for _, r := range jsonStr { + if r == '"' { + inDoubleQuotes = !inDoubleQuotes + } + + if r == '\'' && !inDoubleQuotes { + result.WriteRune('"') + } else { + result.WriteRune(r) + } + } + + return []byte(result.String()) +} diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go new file mode 100644 index 00000000000..ff47a2af3a4 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -0,0 +1,74 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix +// +build !aix + +package azureeventhub + +import ( + "fmt" + "testing" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/assert" +) + +func TestSanitizeMessage(t *testing.T) { + msg := "{\"records\":[{'test':\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", + } + + input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} + input.config.SanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"} + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } +} + +func TestSanitizeSingleQuotes(t *testing.T) { + msg := "{\"records\":[{'test':\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", + } + + input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} + input.config.SanitizeOptions = []string{"SINGLE_QUOTES"} + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } +} + +func TestSanitizeNewLine(t *testing.T) { + msg := "{\"records\":[{\"test\":\"this is some message\n\n\n\", \n\n\n \"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + } + + input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} + input.config.SanitizeOptions = []string{"NEW_LINES"} + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 2) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } +} From 1e4b628c7e6bbf3d5875b9c64754053add4986eb Mon Sep 17 00:00:00 2001 From: Lucian Date: Wed, 19 Apr 2023 12:45:23 +0300 Subject: [PATCH 04/18] make update --- x-pack/filebeat/input/azureeventhub/sanitization_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go index ff47a2af3a4..dd8480ab9b2 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -11,8 +11,9 @@ import ( "fmt" "testing" - "github.com/elastic/elastic-agent-libs/logp" "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" ) func TestSanitizeMessage(t *testing.T) { From 3e41c8a470f357ce09efb745bad41506c24946c6 Mon Sep 17 00:00:00 2001 From: Lucian Date: Fri, 28 Apr 2023 15:58:34 +0300 Subject: [PATCH 05/18] validate sanitization options in config --- x-pack/filebeat/input/azureeventhub/config.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index ca081728064..bb21ddae0de 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -26,11 +26,8 @@ type azureInputConfig struct { SAContainer string `config:"storage_account_container"` // by default the azure public environment is used, to override, users can provide a specific resource manager endpoint OverrideEnvironment string `config:"resource_manager_endpoint"` - // option to cleanup the input for known issues which might produce a malformed JSON - // SanitizeNewLineMessage bool `config:"sanitize_message"` - // SanitizeSingleQuotesMessage bool `config:"sanitize_message"` + // cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES SanitizeOptions []string `config:"sanitize_options"` - // NEW_LINE, SINGLE_QUOTE } const ephContainerName = "filebeat" @@ -69,6 +66,13 @@ func (conf *azureInputConfig) Validate() error { return err } + for _, opt := range conf.SanitizeOptions { + _, err := sanitizationOptFromString(opt) + if err != nil { + logger.Warnf("%s: %v", opt, err) + } + } + return nil } From 9e5a20c39a340b9c296633faad9d508b6d81fcfe Mon Sep 17 00:00:00 2001 From: Lucian Date: Fri, 28 Apr 2023 15:59:43 +0300 Subject: [PATCH 06/18] refactor sanitization logic --- .../input/azureeventhub/sanitization.go | 60 +++++++++++++------ 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go index f9e03e8c8f8..7ae2a168b3c 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -7,43 +7,67 @@ package azureeventhub -import "strings" +import ( + "bytes" + "errors" +) -type sanitizationFunc func(jsonStr string) []byte +type sanitizationOption string -func getSanitizationFuncs() map[string]sanitizationFunc { - return map[string]sanitizationFunc{ - "NEW_LINES": sanitizeNewLines, - "SINGLE_QUOTES": sanitizeSingleQuotes, +const ( + NewLines sanitizationOption = "NEW_LINES" + SingleQuotes sanitizationOption = "SINGLE_QUOTES" + Unknown sanitizationOption = "UNKNOWN" +) + +func sanitizationOptFromString(s string) (sanitizationOption, error) { + switch s { + case "NEW_LINES": + return NewLines, nil + case "SINGLE_QUOTES": + return SingleQuotes, nil + default: + return Unknown, errors.New("invalid sanitization option") } } -func sanitize(jsonStr string, opts ...string) []byte { - var res []byte +// sanitize applies the sanitization options specified in the config +// if no sanitization options are provided, the message remains unchanged +func sanitize(jsonStr []byte, opts ...string) []byte { + res := jsonStr for _, opt := range opts { - f := getSanitizationFuncs()[opt] - res = f(jsonStr) + switch sanitizationOption(opt) { + case NewLines: + res = sanitizeNewLines(res) + case SingleQuotes: + res = sanitizeSingleQuotes(res) + } } return res } -func sanitizeNewLines(jsonStr string) []byte { - var result strings.Builder +// sanitizeNewLines removes newlines found in the message +func sanitizeNewLines(jsonStr []byte) []byte { + var result bytes.Buffer for _, r := range jsonStr { if r == '\n' { continue } - result.WriteRune(r) + + result.WriteByte(r) } - return []byte(result.String()) + return result.Bytes() } -func sanitizeSingleQuotes(jsonStr string) []byte { - var result strings.Builder +// sanitizeSingleQuotes replaces single quotes with double quotes in the message +// single quotes that are in between double quotes remain unchanged +func sanitizeSingleQuotes(jsonStr []byte) []byte { + var result bytes.Buffer + inDoubleQuotes := false for _, r := range jsonStr { @@ -54,9 +78,9 @@ func sanitizeSingleQuotes(jsonStr string) []byte { if r == '\'' && !inDoubleQuotes { result.WriteRune('"') } else { - result.WriteRune(r) + result.WriteByte(r) } } - return []byte(result.String()) + return result.Bytes() } From 18dd2b9a33092c5e8b88ef307a5b6703053f2ac8 Mon Sep 17 00:00:00 2001 From: Lucian Date: Fri, 28 Apr 2023 16:01:17 +0300 Subject: [PATCH 07/18] refactor sanitization tests using subtests --- .../input/azureeventhub/sanitization_test.go | 78 ++++++++++--------- 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go index dd8480ab9b2..ec0b8d0a92f 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -16,8 +16,8 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -func TestSanitizeMessage(t *testing.T) { - msg := "{\"records\":[{'test':\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + +func TestParseMultipleMessagesSanitization(t *testing.T) { + msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" msgs := []string{ @@ -26,28 +26,13 @@ func TestSanitizeMessage(t *testing.T) { "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", } - input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} - input.config.SanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"} - messages := input.parseMultipleMessages([]byte(msg)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 3) - for _, ms := range messages { - assert.Contains(t, msgs, ms) - } -} - -func TestSanitizeSingleQuotes(t *testing.T) { - msg := "{\"records\":[{'test':\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" - msgs := []string{ - "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", + input := azureInput{ + log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + config: azureInputConfig{ + SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"}, + }, } - input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} - input.config.SanitizeOptions = []string{"SINGLE_QUOTES"} messages := input.parseMultipleMessages([]byte(msg)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 3) @@ -56,20 +41,43 @@ func TestSanitizeSingleQuotes(t *testing.T) { } } -func TestSanitizeNewLine(t *testing.T) { - msg := "{\"records\":[{\"test\":\"this is some message\n\n\n\", \n\n\n \"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" - msgs := []string{ - "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", +func TestSanitize(t *testing.T) { + jsonStr := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}") + + testCases := []struct { + name string + opts []string + expected []byte + }{ + { + name: "no options", + opts: []string{}, + expected: jsonStr, + }, + { + name: "NEW_LINES option", + opts: []string{"NEW_LINES"}, + expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "SINGLE_QUOTES option", + opts: []string{"SINGLE_QUOTES"}, + expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "both options", + opts: []string{"NEW_LINES", "SINGLE_QUOTES"}, + expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, } - input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} - input.config.SanitizeOptions = []string{"NEW_LINES"} - messages := input.parseMultipleMessages([]byte(msg)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 2) - for _, ms := range messages { - assert.Contains(t, msgs, ms) + // Run test cases + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + res := sanitize(jsonStr, tc.opts...) + assert.Equal(t, []byte(tc.expected), res) + fmt.Println(string(res)) + }) } } From 3aed9d0f766087a4fea9406c1ca8957b36b61e85 Mon Sep 17 00:00:00 2001 From: Lucian Date: Fri, 28 Apr 2023 16:02:10 +0300 Subject: [PATCH 08/18] remove redundant logic --- x-pack/filebeat/input/azureeventhub/input.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 38b304293cb..022ccbf2d30 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -167,12 +167,10 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} var messages []string + // clean up the message for known issues producing a malformed JSON - if a.config.SanitizeOptions != nil { - for _, opt := range a.config.SanitizeOptions { - bMessage = sanitize(string(bMessage), opt) - } - } + bMessage = sanitize(bMessage, a.config.SanitizeOptions...) + // check if the message is a "records" object containing a list of events err := json.Unmarshal(bMessage, &mapObject) if err == nil { From b8fa4ab8afe50c4a210faf8c1ba9b7f87c1080dd Mon Sep 17 00:00:00 2001 From: Lucian Date: Fri, 28 Apr 2023 16:18:46 +0300 Subject: [PATCH 09/18] fix small issues --- x-pack/filebeat/input/azureeventhub/sanitization_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go index ec0b8d0a92f..7dc5079f50d 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -76,8 +76,7 @@ func TestSanitize(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { res := sanitize(jsonStr, tc.opts...) - assert.Equal(t, []byte(tc.expected), res) - fmt.Println(string(res)) + assert.Equal(t, tc.expected, res) }) } } From ae71f6dcde35325926b52e10e5463013fe158d2e Mon Sep 17 00:00:00 2001 From: Lucian Date: Thu, 4 May 2023 11:05:25 +0300 Subject: [PATCH 10/18] add docstrigs --- x-pack/filebeat/input/azureeventhub/config.go | 3 ++- .../input/azureeventhub/sanitization.go | 18 +++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index bb21ddae0de..96223ebc470 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -66,8 +66,9 @@ func (conf *azureInputConfig) Validate() error { return err } + // log a warning for each sanitization option not supported for _, opt := range conf.SanitizeOptions { - _, err := sanitizationOptFromString(opt) + err := sanitizationOptFromString(opt) if err != nil { logger.Warnf("%s: %v", opt, err) } diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go index 7ae2a168b3c..1daab442866 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -15,19 +15,19 @@ import ( type sanitizationOption string const ( - NewLines sanitizationOption = "NEW_LINES" - SingleQuotes sanitizationOption = "SINGLE_QUOTES" - Unknown sanitizationOption = "UNKNOWN" + newLines sanitizationOption = "NEW_LINES" + singleQuotes sanitizationOption = "SINGLE_QUOTES" ) -func sanitizationOptFromString(s string) (sanitizationOption, error) { +// sanitizationOptFromString returns an error for unknown sanitization options +func sanitizationOptFromString(s string) error { switch s { case "NEW_LINES": - return NewLines, nil + return nil case "SINGLE_QUOTES": - return SingleQuotes, nil + return nil default: - return Unknown, errors.New("invalid sanitization option") + return errors.New("invalid sanitization option") } } @@ -38,9 +38,9 @@ func sanitize(jsonStr []byte, opts ...string) []byte { for _, opt := range opts { switch sanitizationOption(opt) { - case NewLines: + case newLines: res = sanitizeNewLines(res) - case SingleQuotes: + case singleQuotes: res = sanitizeSingleQuotes(res) } } From e8fa2c4206282c7dfdf3a89105e38ac7b9ef5f33 Mon Sep 17 00:00:00 2001 From: Lucian Date: Thu, 4 May 2023 11:51:12 +0300 Subject: [PATCH 11/18] Add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7799092a17b..f076d7d9dce 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -275,6 +275,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036] - Add support for collecting IPv6 metrics. {pull}35123[35123] - Add oracle authentication messages parsing {pull}35127[35127] +- Add sanitization capabilities to azure input {pull}34874[34874] *Auditbeat* - Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817] From e70593a94998f41b540d6921910661c1723504e3 Mon Sep 17 00:00:00 2001 From: Lucian Date: Thu, 4 May 2023 15:42:03 +0300 Subject: [PATCH 12/18] rename validator function --- x-pack/filebeat/input/azureeventhub/config.go | 2 +- x-pack/filebeat/input/azureeventhub/sanitization.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index 96223ebc470..1843ad35091 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -68,7 +68,7 @@ func (conf *azureInputConfig) Validate() error { // log a warning for each sanitization option not supported for _, opt := range conf.SanitizeOptions { - err := sanitizationOptFromString(opt) + err := sanitizeOptionsValidate(opt) if err != nil { logger.Warnf("%s: %v", opt, err) } diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go index 1daab442866..59e9a6274d3 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -19,8 +19,8 @@ const ( singleQuotes sanitizationOption = "SINGLE_QUOTES" ) -// sanitizationOptFromString returns an error for unknown sanitization options -func sanitizationOptFromString(s string) error { +// sanitizeOptionsValidate validates for supported sanitization options +func sanitizeOptionsValidate(s string) error { switch s { case "NEW_LINES": return nil From ef85a1b8cb8303d2b442b98947ec9657a65048a7 Mon Sep 17 00:00:00 2001 From: Lucian Date: Wed, 17 May 2023 17:53:19 +0300 Subject: [PATCH 13/18] check sanitize options and json valid --- x-pack/filebeat/input/azureeventhub/input.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 022ccbf2d30..130e69d0954 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -169,7 +169,10 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var messages []string // clean up the message for known issues producing a malformed JSON - bMessage = sanitize(bMessage, a.config.SanitizeOptions...) + // sanitization occurs if options are available and the message produces invalid JSON + if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { + bMessage = sanitize(bMessage, a.config.SanitizeOptions...) + } // check if the message is a "records" object containing a list of events err := json.Unmarshal(bMessage, &mapObject) From 7b01039fe2873baa772d3e1162941a6e9a333d0a Mon Sep 17 00:00:00 2001 From: Lucian Date: Wed, 17 May 2023 17:54:58 +0300 Subject: [PATCH 14/18] refactor sanitizeNewLines --- x-pack/filebeat/input/azureeventhub/sanitization.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go index 59e9a6274d3..fe87c852b6d 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -50,17 +50,7 @@ func sanitize(jsonStr []byte, opts ...string) []byte { // sanitizeNewLines removes newlines found in the message func sanitizeNewLines(jsonStr []byte) []byte { - var result bytes.Buffer - - for _, r := range jsonStr { - if r == '\n' { - continue - } - - result.WriteByte(r) - } - - return result.Bytes() + return bytes.ReplaceAll(jsonStr, []byte("\n"), []byte{}) } // sanitizeSingleQuotes replaces single quotes with double quotes in the message From df08c30ec97a9b6ca1c44e45474c415e135aa956 Mon Sep 17 00:00:00 2001 From: Lucian Date: Thu, 18 May 2023 07:37:57 +0300 Subject: [PATCH 15/18] refactor sanitizeSingleQuotes --- x-pack/filebeat/input/azureeventhub/sanitization.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go index fe87c852b6d..b78cb3bd6ce 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -57,11 +57,12 @@ func sanitizeNewLines(jsonStr []byte) []byte { // single quotes that are in between double quotes remain unchanged func sanitizeSingleQuotes(jsonStr []byte) []byte { var result bytes.Buffer + var prevChar byte inDoubleQuotes := false for _, r := range jsonStr { - if r == '"' { + if r == '"' && prevChar != '\\' { inDoubleQuotes = !inDoubleQuotes } @@ -70,6 +71,7 @@ func sanitizeSingleQuotes(jsonStr []byte) []byte { } else { result.WriteByte(r) } + prevChar = r } return result.Bytes() From e4e4df2058ca97ef84b60a2dde13cef4d4808a56 Mon Sep 17 00:00:00 2001 From: Lucian Date: Thu, 18 May 2023 07:39:40 +0300 Subject: [PATCH 16/18] rename jsonStr to jsonByte --- x-pack/filebeat/input/azureeventhub/sanitization.go | 12 ++++++------ .../input/azureeventhub/sanitization_test.go | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go index b78cb3bd6ce..537d29951c9 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -33,8 +33,8 @@ func sanitizeOptionsValidate(s string) error { // sanitize applies the sanitization options specified in the config // if no sanitization options are provided, the message remains unchanged -func sanitize(jsonStr []byte, opts ...string) []byte { - res := jsonStr +func sanitize(jsonByte []byte, opts ...string) []byte { + res := jsonByte for _, opt := range opts { switch sanitizationOption(opt) { @@ -49,19 +49,19 @@ func sanitize(jsonStr []byte, opts ...string) []byte { } // sanitizeNewLines removes newlines found in the message -func sanitizeNewLines(jsonStr []byte) []byte { - return bytes.ReplaceAll(jsonStr, []byte("\n"), []byte{}) +func sanitizeNewLines(jsonByte []byte) []byte { + return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{}) } // sanitizeSingleQuotes replaces single quotes with double quotes in the message // single quotes that are in between double quotes remain unchanged -func sanitizeSingleQuotes(jsonStr []byte) []byte { +func sanitizeSingleQuotes(jsonByte []byte) []byte { var result bytes.Buffer var prevChar byte inDoubleQuotes := false - for _, r := range jsonStr { + for _, r := range jsonByte { if r == '"' && prevChar != '\\' { inDoubleQuotes = !inDoubleQuotes } diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go index 7dc5079f50d..cee113f7385 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -42,7 +42,7 @@ func TestParseMultipleMessagesSanitization(t *testing.T) { } func TestSanitize(t *testing.T) { - jsonStr := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}") + jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}") testCases := []struct { name string @@ -52,7 +52,7 @@ func TestSanitize(t *testing.T) { { name: "no options", opts: []string{}, - expected: jsonStr, + expected: jsonByte, }, { name: "NEW_LINES option", @@ -75,7 +75,7 @@ func TestSanitize(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - res := sanitize(jsonStr, tc.opts...) + res := sanitize(jsonByte, tc.opts...) assert.Equal(t, tc.expected, res) }) } From 67fcc5554298c5f917b32a6eb8343f471bd508eb Mon Sep 17 00:00:00 2001 From: lucianpy <59661554+lucianpy@users.noreply.github.com> Date: Thu, 18 May 2023 07:43:02 +0300 Subject: [PATCH 17/18] update CHANGELOG.next.asciidoc Co-authored-by: Maurizio Branca --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d103a5ccd17..0d28b95d515 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -281,7 +281,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036] - Add support for collecting IPv6 metrics. {pull}35123[35123] - Add oracle authentication messages parsing {pull}35127[35127] -- Add sanitization capabilities to azure input {pull}34874[34874] +- Add sanitization capabilities to azure-eventhub input {pull}34874[34874] - Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204] - Add execution budget to CEL input. {pull}35409[35409] From cbe18cf0a939922bd097cdaf8a9a4bec3d5c05dd Mon Sep 17 00:00:00 2001 From: lucianpy <59661554+lucianpy@users.noreply.github.com> Date: Thu, 18 May 2023 07:44:13 +0300 Subject: [PATCH 18/18] update sanitization summary Co-authored-by: Maurizio Branca --- x-pack/filebeat/input/azureeventhub/input.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 8d646c5e86f..6254e7698b1 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -167,8 +167,10 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} var messages []string - // clean up the message for known issues producing a malformed JSON - // sanitization occurs if options are available and the message produces invalid JSON + // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. + // Sanitization occurs if options are available and the message contains an invalid JSON. + // + // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { bMessage = sanitize(bMessage, a.config.SanitizeOptions...) }