Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Azure] Sanitize message in case of malformed json #34874

Merged
merged 21 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036]
- Add support for collecting IPv6 metrics. {pull}35123[35123]
- Add oracle authentication messages parsing {pull}35127[35127]
- Add sanitization capabilities to azure input {pull}34874[34874]
lucian-ioan marked this conversation as resolved.
Show resolved Hide resolved
- Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204]
- Add execution budget to CEL input. {pull}35409[35409]

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/azureeventhub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type azureInputConfig struct {
SAContainer string `config:"storage_account_container"`
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
OverrideEnvironment string `config:"resource_manager_endpoint"`
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
SanitizeOptions []string `config:"sanitize_options"`
}

const ephContainerName = "filebeat"
Expand Down Expand Up @@ -63,6 +65,14 @@ func (conf *azureInputConfig) Validate() error {
return err
}

// log a warning for each sanitization option not supported
for _, opt := range conf.SanitizeOptions {
err := sanitizeOptionsValidate(opt)
if err != nil {
logger.Warnf("%s: %v", opt, err)
}
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
var mapObject map[string][]interface{}
var messages []string

// clean up the message for known issues producing a malformed JSON
// sanitization occurs if options are available and the message produces invalid JSON
lucian-ioan marked this conversation as resolved.
Show resolved Hide resolved
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
}

// check if the message is a "records" object containing a list of events
err := json.Unmarshal(bMessage, &mapObject)
if err == nil {
Expand Down
76 changes: 76 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"bytes"
"errors"
)

type sanitizationOption string

const (
newLines sanitizationOption = "NEW_LINES"
singleQuotes sanitizationOption = "SINGLE_QUOTES"
)

// sanitizeOptionsValidate validates for supported sanitization options
func sanitizeOptionsValidate(s string) error {
switch s {
case "NEW_LINES":
return nil
case "SINGLE_QUOTES":
return nil
default:
return errors.New("invalid sanitization option")
}
}

// sanitize applies the sanitization options specified in the config
// if no sanitization options are provided, the message remains unchanged
func sanitize(jsonStr []byte, opts ...string) []byte {
res := jsonStr

for _, opt := range opts {
switch sanitizationOption(opt) {
case newLines:
res = sanitizeNewLines(res)
case singleQuotes:
res = sanitizeSingleQuotes(res)
}
}

return res
}

// sanitizeNewLines removes newlines found in the message
func sanitizeNewLines(jsonStr []byte) []byte {
return bytes.ReplaceAll(jsonStr, []byte("\n"), []byte{})
}

// sanitizeSingleQuotes replaces single quotes with double quotes in the message
// single quotes that are in between double quotes remain unchanged
func sanitizeSingleQuotes(jsonStr []byte) []byte {
var result bytes.Buffer

inDoubleQuotes := false

for _, r := range jsonStr {
if r == '"' {
inDoubleQuotes = !inDoubleQuotes
}

if r == '\'' && !inDoubleQuotes {
result.WriteRune('"')
} else {
result.WriteByte(r)
}
}

return result.Bytes()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This algorithm based on double-quotes detection may not only work in all cases.

For example, there are log categories where string values contain an embedded JSON, for example:

https://github.com/elastic/integrations/blob/6ede14e475050d34e62b1b83ad839f688d80c902/packages/azure/data_stream/activitylogs/_dev/test/pipeline/test-activitylogs-edgecases.log#L2-L3

Copy link
Contributor Author

@lucian-ioan lucian-ioan May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I see Azure is very creative in their JSON output:

"value": "[\"AuditLog.Read.All\",\"Calendar.ReadWrite\"...]"

Treating JSON embedded within strings as simple strings keeps the logic fairly clean while addressing this.
Escaped double quotes are no longer toggling inDoubleQuotes.

82 changes: 82 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestParseMultipleMessagesSanitization(t *testing.T) {
msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}"
msgs := []string{
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}",
}

input := azureInput{
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),
config: azureInputConfig{
SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"},
},
}

messages := input.parseMultipleMessages([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
}

func TestSanitize(t *testing.T) {
jsonStr := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}")

testCases := []struct {
name string
opts []string
expected []byte
}{
{
name: "no options",
opts: []string{},
expected: jsonStr,
},
{
name: "NEW_LINES option",
opts: []string{"NEW_LINES"},
expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "SINGLE_QUOTES option",
opts: []string{"SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "both options",
opts: []string{"NEW_LINES", "SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
}

// Run test cases
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
res := sanitize(jsonStr, tc.opts...)
assert.Equal(t, tc.expected, res)
})
}
}