Skip to content

Commit

Permalink
Allow to use a star (*) to extract a value that can be referenced. (e…
Browse files Browse the repository at this point in the history
…lastic#8750)

This commit adds support for * instead of using a named skip field
(?field) this make it compatible with ingest pipeline syntax in 6.5.

We are also adding validation enforcing that each indirect field must use an existing
and valid reference.

Fix: elastic#8054
(cherry picked from commit bb93d05)
  • Loading branch information
ph committed Nov 9, 2018
1 parent 6523646 commit 6112fdb
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]

*Affecting all Beats*

- Dissect syntax change, use * instead of ? when working with field reference. {issue}8054[8054]

*Auditbeat*

*Filebeat*
Expand Down
1 change: 1 addition & 0 deletions libbeat/processors/dissect/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
appendIndirectPrefix = "+&"
indirectAppendPrefix = "&+"
greedySuffix = "->"
pointerFieldPrefix = "*"

defaultJoinString = " "

Expand Down
7 changes: 6 additions & 1 deletion libbeat/processors/dissect/dissect.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (d *Dissector) resolve(s string, p positions) Map {
f.Apply(s[pos.start:pos.end], m)
}

for _, f := range d.parser.skipFields {
for _, f := range d.parser.referenceFields {
delete(m, f.Key())
}
return m
Expand All @@ -145,5 +145,10 @@ func New(tokenizer string) (*Dissector, error) {
if err != nil {
return nil, err
}

if err := validate(p); err != nil {
return nil, err
}

return &Dissector{parser: p, raw: tokenizer}, nil
}
26 changes: 26 additions & 0 deletions libbeat/processors/dissect/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (f skipField) IsSaveable() bool {
// message: hello world
// result:
// hello: world
//
// Deprecated: see pointerField
type namedSkipField struct {
baseField
}
Expand All @@ -120,6 +122,20 @@ func (f namedSkipField) IsSaveable() bool {
return false
}

// pointerField will extract the content between the delimiters and we can reference it during when
// extracing other values.
type pointerField struct {
baseField
}

func (f pointerField) Apply(b string, m Map) {
m[f.Key()] = b
}

func (f pointerField) IsSaveable() bool {
return false
}

// IndirectField is a value that will be extracted and saved in a previously defined namedSkipField.
// the field is defined with the following syntax: `%{&key}`.
//
Expand Down Expand Up @@ -192,6 +208,10 @@ func newField(id int, rawKey string, previous delimiter) (field, error) {
return newNamedSkipField(id, key[1:]), nil
}

if strings.HasPrefix(key, pointerFieldPrefix) {
return newPointerField(id, key[1:]), nil
}

if strings.HasPrefix(key, appendFieldPrefix) {
return newAppendField(id, key[1:], ordinal, greedy, previous), nil
}
Expand All @@ -213,6 +233,12 @@ func newNamedSkipField(id int, key string) namedSkipField {
}
}

func newPointerField(id int, key string) pointerField {
return pointerField{
baseField{id: id, key: key},
}
}

func newAppendField(id int, key string, ordinal int, greedy bool, previous delimiter) appendField {
return appendField{
baseField: baseField{
Expand Down
35 changes: 27 additions & 8 deletions libbeat/processors/dissect/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ import (
// parser extracts the useful information from the raw tokenizer string, fields, delimiters and
// skip fields.
type parser struct {
delimiters []delimiter
fields []field
skipFields []field
delimiters []delimiter
fields []field
referenceFields []field
}

var isIndirectField = func(field field) bool {
switch field.(type) {
case indirectField:
return true
default:
return false
}
}

func newParser(tokenizer string) (*parser, error) {
Expand Down Expand Up @@ -74,16 +83,26 @@ func newParser(tokenizer string) (*parser, error) {
})

// List of fields needed for indirection but don't need to appear in the final event.
var skipFields []field
var referenceFields []field
for _, f := range fields {
if !f.IsSaveable() {
skipFields = append(skipFields, f)
referenceFields = append(referenceFields, f)
}
}

return &parser{
delimiters: delimiters,
fields: fields,
skipFields: skipFields,
delimiters: delimiters,
fields: fields,
referenceFields: referenceFields,
}, nil
}

func filterFieldsWith(fields []field, predicate func(field) bool) []field {
var filtered []field
for _, field := range fields {
if predicate(field) {
filtered = append(filtered, field)
}
}
return filtered
}
3 changes: 1 addition & 2 deletions libbeat/processors/dissect/testdata/dissect_tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,10 @@
},
{
"name": "indirect field",
"tok": "%{key} %{\u0026key}",
"tok": "%{?key} %{\u0026key}",
"msg": "hello world",
"expected": {
"hello": "world",
"key": "hello"
},
"skip": false,
"fail": false
Expand Down
42 changes: 42 additions & 0 deletions libbeat/processors/dissect/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package dissect

import (
"fmt"
)

func validate(p *parser) error {
indirectFields := filterFieldsWith(p.fields, isIndirectField)

for _, field := range indirectFields {
found := false
for _, reference := range p.referenceFields {
if reference.Key() == field.Key() {
found = true
break
}
}

if found == false {
return fmt.Errorf("missing reference for key '%s'", field.Key())
}
}

return nil
}
54 changes: 54 additions & 0 deletions libbeat/processors/dissect/validate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package dissect

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidate(t *testing.T) {
tests := []struct {
name string
p *parser
expectError bool
}{
{
name: "when we find reference field for all indirect field",
p: &parser{
fields: []field{newIndirectField(1, "hello"), newNormalField(0, "hola", 1, false)},
referenceFields: []field{newPointerField(2, "hello")},
},
expectError: false,
},
{
name: "when we cannot find all the reference field for all indirect field",
p: &parser{
fields: []field{newIndirectField(1, "hello"), newNormalField(0, "hola", 1, false)},
referenceFields: []field{newPointerField(2, "okhello")},
},
expectError: true,
},
}

for _, test := range tests {
err := validate(test.p)
assert.Equal(t, test.expectError, err != nil)
}
}

0 comments on commit 6112fdb

Please sign in to comment.