Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson/internal/v2: allow split chains to con…
Browse files Browse the repository at this point in the history
…tinue past empty targets (#27880)

This adds a configuration option "ignore_empty_value" that allows a split
processor chain to continue if a target field is missing or empty.

Updates #26008

(cherry picked from commit 2036ad8)
  • Loading branch information
efd6 authored and mergify-bot committed Sep 14, 2021
1 parent 4388d22 commit 1cae31d
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added support for parsing syslog dates containing a leading 0 (e.g. `Sep 01`) rather than a space. {pull}27775[27775]
- Add base64 Encode functionality to httpjson input. {pull}27681[27681]
- Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735]
- Improve memory usage of line reader of `log` and `filestream` input. {pull}27782[27782]
- Add `ignore_empty_value` flag to `httpjson` `split` processor. {pull}27880[27880]


*Heartbeat*
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ Required if using split type of `string`. This is the sub string used to split

Valid when used with `type: map`. When not empty, defines a new field where the original key value will be stored.

[float]
==== `response.split[].ignore_empty_value`

If set to true, empty or missing value will be ignored and processing will pass on to the next nested split operation instead of failing with an error. Default: `false`.

[float]
==== `response.split[].split`

Expand Down
15 changes: 8 additions & 7 deletions x-pack/filebeat/input/httpjson/internal/v2/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ type responseConfig struct {
}

type splitConfig struct {
Target string `config:"target" validation:"required"`
Type string `config:"type"`
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
KeepParent bool `config:"keep_parent"`
KeyField string `config:"key_field"`
DelimiterString string `config:"delimiter"`
Target string `config:"target" validation:"required"`
Type string `config:"type"`
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
KeepParent bool `config:"keep_parent"`
KeyField string `config:"key_field"`
DelimiterString string `config:"delimiter"`
IgnoreEmptyValue bool `config:"ignore_empty_value"`
}

func (c *responseConfig) Validate() error {
Expand Down
77 changes: 59 additions & 18 deletions x-pack/filebeat/input/httpjson/internal/v2/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@ var (
errExpectedSplitString = errors.New("split was expecting field to be a string")
)

// split is a split processor chain element. Split processing is executed
// by applying elements of the chain's linked list to an input until completed
// or an error state is encountered.
type split struct {
log *logp.Logger
targetInfo targetInfo
kind string
transforms []basicTransform
child *split
keepParent bool
keyField string
isRoot bool
delimiter string
log *logp.Logger
targetInfo targetInfo
kind string
transforms []basicTransform
child *split
keepParent bool
ignoreEmptyValue bool
keyField string
isRoot bool
delimiter string
}

// newSplitResponse returns a new split based on the provided config and
// logging to the provided logger, tagging the split as the root of the chain.
func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if cfg == nil {
return nil, nil
Expand All @@ -42,11 +48,13 @@ func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if err != nil {
return nil, err
}
// we want to be able to identify which split is the root of the chain
// We want to be able to identify which split is the root of the chain.
split.isRoot = true
return split, nil
}

// newSplit returns a new split based on the provided config and
// logging to the provided logger.
func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
ti, err := getTargetInfo(c.Target)
if err != nil {
Expand All @@ -71,29 +79,40 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
}

return &split{
log: log,
targetInfo: ti,
kind: c.Type,
keepParent: c.KeepParent,
keyField: c.KeyField,
delimiter: c.DelimiterString,
transforms: ts,
child: s,
log: log,
targetInfo: ti,
kind: c.Type,
keepParent: c.KeepParent,
ignoreEmptyValue: c.IgnoreEmptyValue,
keyField: c.KeyField,
delimiter: c.DelimiterString,
transforms: ts,
child: s,
}, nil
}

// run runs the split operation on the contents of resp, sending successive
// split results on ch. ctx is passed to transforms that are called during
// the split.
func (s *split) run(ctx *transformContext, resp transformable, ch chan<- maybeMsg) error {
root := resp.body()
return s.split(ctx, root, ch)
}

// split recursively executes the split processor chain.
func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybeMsg) error {
v, err := root.GetValue(s.targetInfo.Name)
if err != nil && err != common.ErrKeyNotFound {
return err
}

if v == nil {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -109,6 +128,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(varr) == 0 {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -130,6 +155,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vmap) == 0 {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -151,6 +182,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vstr) == 0 {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -169,6 +206,8 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
return errors.New("unknown split type")
}

// sendMessage sends an array or map split result value, v, on ch after performing
// any necessary transformations. If key is "", the value is an element of an array.
func (s *split) sendMessage(ctx *transformContext, root common.MapStr, key string, v interface{}, ch chan<- maybeMsg) error {
obj, ok := toMapStr(v)
if !ok {
Expand Down Expand Up @@ -220,6 +259,8 @@ func toMapStr(v interface{}) (common.MapStr, bool) {
return common.MapStr{}, false
}

// sendMessage sends a string split result value, v, on ch after performing any
// necessary transformations. If key is "", the value is an element of an array.
func (s *split) sendMessageSplitString(ctx *transformContext, root common.MapStr, v string, ch chan<- maybeMsg) error {
clone := root.Clone()
_, _ = clone.Put(s.targetInfo.Name, v)
Expand Down
Loading

0 comments on commit 1cae31d

Please sign in to comment.