Skip to content

Commit

Permalink
[Azure] Sanitize message in case of malformed json (#34874)
Browse files Browse the repository at this point in the history
* Add sanitization function and test for azure input
  • Loading branch information
lucian-ioan authored May 31, 2023
1 parent 13eea9d commit 4096f9b
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036]
- Add support for collecting IPv6 metrics. {pull}35123[35123]
- Add oracle authentication messages parsing {pull}35127[35127]
- Add sanitization capabilities to azure-eventhub input {pull}34874[34874]
- Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204]
- Add execution budget to CEL input. {pull}35409[35409]
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/azureeventhub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type azureInputConfig struct {
SAContainer string `config:"storage_account_container"`
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
OverrideEnvironment string `config:"resource_manager_endpoint"`
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
SanitizeOptions []string `config:"sanitize_options"`
}

const ephContainerName = "filebeat"
Expand Down Expand Up @@ -63,6 +65,14 @@ func (conf *azureInputConfig) Validate() error {
return err
}

// log a warning for each sanitization option not supported
for _, opt := range conf.SanitizeOptions {
err := sanitizeOptionsValidate(opt)
if err != nil {
logger.Warnf("%s: %v", opt, err)
}
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
var mapObject map[string][]interface{}
var messages []string

// Clean up the message for known issues [1] where Azure services produce malformed JSON documents.
// Sanitization occurs if options are available and the message contains an invalid JSON.
//
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
}

// check if the message is a "records" object containing a list of events
err := json.Unmarshal(bMessage, &mapObject)
if err == nil {
Expand Down
78 changes: 78 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"bytes"
"errors"
)

type sanitizationOption string

const (
newLines sanitizationOption = "NEW_LINES"
singleQuotes sanitizationOption = "SINGLE_QUOTES"
)

// sanitizeOptionsValidate validates for supported sanitization options
func sanitizeOptionsValidate(s string) error {
switch s {
case "NEW_LINES":
return nil
case "SINGLE_QUOTES":
return nil
default:
return errors.New("invalid sanitization option")
}
}

// sanitize applies the sanitization options specified in the config
// if no sanitization options are provided, the message remains unchanged
func sanitize(jsonByte []byte, opts ...string) []byte {
res := jsonByte

for _, opt := range opts {
switch sanitizationOption(opt) {
case newLines:
res = sanitizeNewLines(res)
case singleQuotes:
res = sanitizeSingleQuotes(res)
}
}

return res
}

// sanitizeNewLines removes newlines found in the message
func sanitizeNewLines(jsonByte []byte) []byte {
return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{})
}

// sanitizeSingleQuotes replaces single quotes with double quotes in the message
// single quotes that are in between double quotes remain unchanged
func sanitizeSingleQuotes(jsonByte []byte) []byte {
var result bytes.Buffer
var prevChar byte

inDoubleQuotes := false

for _, r := range jsonByte {
if r == '"' && prevChar != '\\' {
inDoubleQuotes = !inDoubleQuotes
}

if r == '\'' && !inDoubleQuotes {
result.WriteRune('"')
} else {
result.WriteByte(r)
}
prevChar = r
}

return result.Bytes()
}
82 changes: 82 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestParseMultipleMessagesSanitization(t *testing.T) {
msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}"
msgs := []string{
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}",
}

input := azureInput{
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),
config: azureInputConfig{
SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"},
},
}

messages := input.parseMultipleMessages([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
}

func TestSanitize(t *testing.T) {
jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}")

testCases := []struct {
name string
opts []string
expected []byte
}{
{
name: "no options",
opts: []string{},
expected: jsonByte,
},
{
name: "NEW_LINES option",
opts: []string{"NEW_LINES"},
expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "SINGLE_QUOTES option",
opts: []string{"SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "both options",
opts: []string{"NEW_LINES", "SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
}

// Run test cases
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
res := sanitize(jsonByte, tc.opts...)
assert.Equal(t, tc.expected, res)
})
}
}

0 comments on commit 4096f9b

Please sign in to comment.