Skip to content

Commit

Permalink
LIBBEAT: Enhancement Convert dissected values from String to other ba…
Browse files Browse the repository at this point in the history
…sic data types and IP (#18683) (#19838)

(cherry picked from commit 84d75e5)

Co-authored-by: premendrasingh <[email protected]>
  • Loading branch information
sayden and premendrasingh authored Jul 14, 2020
1 parent bd44c50 commit d26d033
Show file tree
Hide file tree
Showing 10 changed files with 502 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817]
- Add support for multiple sets of hints on autodiscover {pull}18883[18883]
- Add a configurable delay between retries when an app metadata cannot be retrieved by `add_cloudfoundry_metadata`. {pull}19181[19181]
- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683]
- Add the `ignore_failure` configuration option to the dissect processor. {pull}19464[19464]
- Add the `overwrite_keys` configuration option to the dissect processor. {pull}19464[19464]
- Add support to trim captured values in the dissect processor. {pull}19464[19464]
Expand Down
48 changes: 48 additions & 0 deletions libbeat/processors/dissect/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,51 @@ func TestConfig(t *testing.T) {
assert.Equal(t, trimModeAll, cfg.TrimValues)
})
}

func TestConfigForDataType(t *testing.T) {
t.Run("valid data type", func(t *testing.T) {
c, err := common.NewConfigFrom(map[string]interface{}{
"tokenizer": "%{value1|integer} %{value2|float} %{value3|boolean} %{value4|long} %{value5|double}",
"field": "message",
})
if !assert.NoError(t, err) {
return
}

cfg := config{}
err = c.Unpack(&cfg)
if !assert.NoError(t, err) {
return
}
})
t.Run("invalid data type", func(t *testing.T) {
c, err := common.NewConfigFrom(map[string]interface{}{
"tokenizer": "%{value1|int} %{value2|short} %{value3|char} %{value4|void} %{value5|unsigned} id=%{id|xyz} status=%{status|abc} msg=\"%{message}\"",
"field": "message",
})
if !assert.NoError(t, err) {
return
}

cfg := config{}
err = c.Unpack(&cfg)
if !assert.Error(t, err) {
return
}
})
t.Run("missing data type", func(t *testing.T) {
c, err := common.NewConfigFrom(map[string]interface{}{
"tokenizer": "%{value1|} %{value2|}",
"field": "message",
})
if !assert.NoError(t, err) {
return
}

cfg := config{}
err = c.Unpack(&cfg)
if !assert.Error(t, err) {
return
}
})
}
8 changes: 7 additions & 1 deletion libbeat/processors/dissect/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ var (
indirectAppendPrefix = "&+"
greedySuffix = "->"
pointerFieldPrefix = "*"
dataTypeIndicator = "|"
dataTypeSeparator = "\\|" // Needed for regexp

numberRE = "\\d{1,2}"
alphaRE = "[[:alpha:]]*"

delimiterRE = regexp.MustCompile("(?s)(.*?)%\\{([^}]*?)}")
suffixRE = regexp.MustCompile("(.+?)" + // group 1 for key name
"(" + ordinalIndicator + "(" + numberRE + ")" + ")?" + // group 2, 3 for ordinal
"(" + fixedLengthIndicator + "(" + numberRE + ")" + ")?" + // group 4, 5 for fixed length
"(" + greedySuffix + ")?$") // group 6 for greedy
"(" + greedySuffix + ")?" + // group 6 for greedy
"(" + dataTypeSeparator + "(" + alphaRE + ")?" + ")?$") // group 7,8 for data type separator and data type

defaultJoinString = " "

Expand All @@ -55,4 +59,6 @@ var (
errMixedPrefixIndirectAppend = errors.New("mixed prefix `&+`")
errMixedPrefixAppendIndirect = errors.New("mixed prefix `&+`")
errEmptyKey = errors.New("empty key")
errInvalidDatatype = errors.New("invalid data type")
errMissingDatatype = errors.New("missing data type")
)
106 changes: 105 additions & 1 deletion libbeat/processors/dissect/dissect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@

package dissect

import "fmt"
import (
"fmt"
"net"
"strconv"
"strings"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/pkg/errors"
)

// Map represents the keys and their values extracted with the defined tokenizer.
type Map = map[string]string
type MapConverted = map[string]interface{}

// positions represents the start and end position of the keys found in the string.
type positions []position
Expand Down Expand Up @@ -67,6 +77,23 @@ func (d *Dissector) Dissect(s string) (Map, error) {
return d.resolve(s, positions), nil
}

func (d *Dissector) DissectConvert(s string) (MapConverted, error) {
if len(s) == 0 {
return nil, errEmpty
}

positions, err := d.extract(s)
if err != nil {
return nil, err
}

if len(positions) == 0 {
return nil, errParsingFailure
}

return d.resolveConvert(s, positions), nil
}

// Raw returns the raw tokenizer used to generate the actual parser.
func (d *Dissector) Raw() string {
return d.raw
Expand Down Expand Up @@ -167,6 +194,35 @@ func (d *Dissector) resolve(s string, p positions) Map {
return m
}

func (d *Dissector) resolveConvert(s string, p positions) MapConverted {
lookup := make(common.MapStr, len(p))
m := make(Map, len(p))
mc := make(MapConverted, len(p))
for _, f := range d.parser.fields {
pos := p[f.ID()]
f.Apply(s[pos.start:pos.end], m) // using map[string]string to avoid another set of apply methods
if !f.IsSaveable() {
lookup[f.Key()] = s[pos.start:pos.end]
} else {
key := f.Key()
if k, ok := lookup[f.Key()]; ok {
key = k.(string)
}
v, _ := m[key]
if f.DataType() != "" {
mc[key] = convertData(f.DataType(), v)
} else {
mc[key] = v
}
}
}

for _, f := range d.parser.referenceFields {
delete(mc, f.Key())
}
return mc
}

// New creates a new Dissector from a tokenized string.
func New(tokenizer string) (*Dissector, error) {
p, err := newParser(tokenizer)
Expand All @@ -180,3 +236,51 @@ func New(tokenizer string) (*Dissector, error) {

return &Dissector{parser: p, raw: tokenizer}, nil
}

// strToInt is a helper to interpret a string as either base 10 or base 16.
func strToInt(s string, bitSize int) (int64, error) {
base := 10
if strings.HasPrefix(s, "0x") || strings.HasPrefix(s, "0X") {
// strconv.ParseInt will accept the '0x' or '0X` prefix only when base is 0.
base = 0
}
return strconv.ParseInt(s, base, bitSize)
}

func transformType(typ dataType, value string) (interface{}, error) {
value = strings.TrimRight(value, " ")
switch typ {
case String:
return value, nil
case Long:
return strToInt(value, 64)
case Integer:
i, err := strToInt(value, 32)
return int32(i), err
case Float:
f, err := strconv.ParseFloat(value, 32)
return float32(f), err
case Double:
d, err := strconv.ParseFloat(value, 64)
return float64(d), err
case Boolean:
return strconv.ParseBool(value)
case IP:
if net.ParseIP(value) != nil {
return value, nil
}
return "", errors.New("value is not a valid IP address")
default:
return value, nil
}
}

func convertData(typ string, b string) interface{} {
if dt, ok := dataTypeNames[typ]; ok {
value, err := transformType(dt, b)
if err == nil {
return value
}
}
return b
}
Loading

0 comments on commit d26d033

Please sign in to comment.