Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/httpjson/internal/v2: allow split chains to continue past empty targets #27880

Merged
merged 2 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add base64 Encode functionality to httpjson input. {pull}27681[27681]
- Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735]
- Improve memory usage of line reader of `log` and `filestream` input. {pull}27782[27782]
- Add `ignore_empty_value` flag to `httpjson` `split` processor. {pull}27880[27880]


*Heartbeat*
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ Required if using split type of `string`. This is the sub string used to split

Valid when used with `type: map`. When not empty, defines a new field where the original key value will be stored.

[float]
==== `response.split[].ignore_empty_value`

If set to true, empty or missing value will be ignored and processing will pass on to the next nested split operation instead of failing with an error. Default: `false`.

[float]
==== `response.split[].split`

Expand Down
15 changes: 8 additions & 7 deletions x-pack/filebeat/input/httpjson/internal/v2/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ type responseConfig struct {
}

type splitConfig struct {
Target string `config:"target" validation:"required"`
Type string `config:"type"`
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
KeepParent bool `config:"keep_parent"`
KeyField string `config:"key_field"`
DelimiterString string `config:"delimiter"`
Target string `config:"target" validation:"required"`
Type string `config:"type"`
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
KeepParent bool `config:"keep_parent"`
KeyField string `config:"key_field"`
DelimiterString string `config:"delimiter"`
IgnoreEmptyValue bool `config:"ignore_empty_value"`
}

func (c *responseConfig) Validate() error {
Expand Down
77 changes: 59 additions & 18 deletions x-pack/filebeat/input/httpjson/internal/v2/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@ var (
errExpectedSplitString = errors.New("split was expecting field to be a string")
)

// split is a split processor chain element. Split processing is executed
// by applying elements of the chain's linked list to an input until completed
// or an error state is encountered.
type split struct {
log *logp.Logger
targetInfo targetInfo
kind string
transforms []basicTransform
child *split
keepParent bool
keyField string
isRoot bool
delimiter string
log *logp.Logger
targetInfo targetInfo
kind string
transforms []basicTransform
child *split
keepParent bool
ignoreEmptyValue bool
keyField string
isRoot bool
delimiter string
}

// newSplitResponse returns a new split based on the provided config and
// logging to the provided logger, tagging the split as the root of the chain.
func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if cfg == nil {
return nil, nil
Expand All @@ -42,11 +48,13 @@ func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if err != nil {
return nil, err
}
// we want to be able to identify which split is the root of the chain
// We want to be able to identify which split is the root of the chain.
split.isRoot = true
return split, nil
}

// newSplit returns a new split based on the provided config and
// logging to the provided logger.
func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
ti, err := getTargetInfo(c.Target)
if err != nil {
Expand All @@ -71,29 +79,40 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
}

return &split{
log: log,
targetInfo: ti,
kind: c.Type,
keepParent: c.KeepParent,
keyField: c.KeyField,
delimiter: c.DelimiterString,
transforms: ts,
child: s,
log: log,
targetInfo: ti,
kind: c.Type,
keepParent: c.KeepParent,
ignoreEmptyValue: c.IgnoreEmptyValue,
keyField: c.KeyField,
delimiter: c.DelimiterString,
transforms: ts,
child: s,
}, nil
}

// run runs the split operation on the contents of resp, sending successive
// split results on ch. ctx is passed to transforms that are called during
// the split.
func (s *split) run(ctx *transformContext, resp transformable, ch chan<- maybeMsg) error {
root := resp.body()
return s.split(ctx, root, ch)
}

// split recursively executes the split processor chain.
func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybeMsg) error {
v, err := root.GetValue(s.targetInfo.Name)
if err != nil && err != common.ErrKeyNotFound {
return err
}

if v == nil {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -109,6 +128,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(varr) == 0 {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -130,6 +155,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vmap) == 0 {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -151,6 +182,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vstr) == 0 {
if s.ignoreEmptyValue {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -169,6 +206,8 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
return errors.New("unknown split type")
}

// sendMessage sends an array or map split result value, v, on ch after performing
// any necessary transformations. If key is "", the value is an element of an array.
func (s *split) sendMessage(ctx *transformContext, root common.MapStr, key string, v interface{}, ch chan<- maybeMsg) error {
obj, ok := toMapStr(v)
if !ok {
Expand Down Expand Up @@ -220,6 +259,8 @@ func toMapStr(v interface{}) (common.MapStr, bool) {
return common.MapStr{}, false
}

// sendMessage sends a string split result value, v, on ch after performing any
// necessary transformations. If key is "", the value is an element of an array.
func (s *split) sendMessageSplitString(ctx *transformContext, root common.MapStr, v string, ch chan<- maybeMsg) error {
clone := root.Clone()
_, _ = clone.Put(s.targetInfo.Name, v)
Expand Down
Loading