Skip to content

Commit

Permalink
Semigroupoid multiline (influxdata#8167) (influxdata#8190)
Browse files Browse the repository at this point in the history
Co-authored-by: javicrespo <[email protected]>
Co-authored-by: jcrespo <[email protected]>
Co-authored-by: semigroupoid <[email protected]>
  • Loading branch information
4 people authored Sep 28, 2020
1 parent 1ee1649 commit d1a30a8
Show file tree
Hide file tree
Showing 7 changed files with 687 additions and 11 deletions.
17 changes: 17 additions & 0 deletions plugins/inputs/tail/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ The plugin expects messages in one of the
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## multiline parser/codec
## https://www.elastic.co/guide/en/logstash/2.4/plugins-filters-multiline.html
#[inputs.tail.multiline]
## The pattern should be a regexp which matches what you believe to be an indicator that the field is part of an event consisting of multiple lines of log data.
#pattern = "^\s"

## The field's value must be previous or next and indicates the relation to the
## multi-line event.
#match_which_line = "previous"

## The invert_match can be true or false (defaults to false).
## If true, a message not matching the pattern will constitute a match of the multiline filter and the what will be applied. (vice-versa is also true)
#invert_match = false

#After the specified timeout, this plugin sends the multiline event even if no new pattern is found to start a new event. The default is 5s.
#timeout = 5s
```

### Metrics
Expand Down
135 changes: 135 additions & 0 deletions plugins/inputs/tail/multiline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package tail

import (
"bytes"
"fmt"
"regexp"
"strings"
"time"

"github.com/influxdata/telegraf/internal"
)

// Indicates relation to the multiline event: previous or next
type MultilineMatchWhichLine int

type Multiline struct {
config *MultilineConfig
enabled bool
patternRegexp *regexp.Regexp
}

type MultilineConfig struct {
Pattern string
MatchWhichLine MultilineMatchWhichLine `toml:"match_which_line"`
InvertMatch bool
Timeout *internal.Duration
}

const (
// Previous => Append current line to previous line
Previous MultilineMatchWhichLine = iota
// Next => Next line will be appended to current line
Next
)

func (m *MultilineConfig) NewMultiline() (*Multiline, error) {
enabled := false
var r *regexp.Regexp
var err error

if m.Pattern != "" {
enabled = true
if r, err = regexp.Compile(m.Pattern); err != nil {
return nil, err
}
if m.Timeout == nil || m.Timeout.Duration.Nanoseconds() == int64(0) {
m.Timeout = &internal.Duration{Duration: 5 * time.Second}
}
}

return &Multiline{
config: m,
enabled: enabled,
patternRegexp: r}, nil
}

func (m *Multiline) IsEnabled() bool {
return m.enabled
}

func (m *Multiline) ProcessLine(text string, buffer *bytes.Buffer) string {
if m.matchString(text) {
buffer.WriteString(text)
return ""
}

if m.config.MatchWhichLine == Previous {
previousText := buffer.String()
buffer.Reset()
buffer.WriteString(text)
text = previousText
} else {
// Next
if buffer.Len() > 0 {
buffer.WriteString(text)
text = buffer.String()
buffer.Reset()
}
}

return text
}

func (m *Multiline) Flush(buffer *bytes.Buffer) string {
if buffer.Len() == 0 {
return ""
}
text := buffer.String()
buffer.Reset()
return text
}

func (m *Multiline) matchString(text string) bool {
return m.patternRegexp.MatchString(text) != m.config.InvertMatch
}

func (w MultilineMatchWhichLine) String() string {
switch w {
case Previous:
return "previous"
case Next:
return "next"
}
return ""
}

// UnmarshalTOML implements ability to unmarshal MultilineMatchWhichLine from TOML files.
func (w *MultilineMatchWhichLine) UnmarshalTOML(data []byte) (err error) {
return w.UnmarshalText(data)
}

// UnmarshalText implements encoding.TextUnmarshaler
func (w *MultilineMatchWhichLine) UnmarshalText(data []byte) (err error) {
s := string(data)
switch strings.ToUpper(s) {
case `PREVIOUS`, `"PREVIOUS"`, `'PREVIOUS'`:
*w = Previous
return

case `NEXT`, `"NEXT"`, `'NEXT'`:
*w = Next
return
}
*w = -1
return fmt.Errorf("E! [inputs.tail] unknown multiline MatchWhichLine")
}

// MarshalText implements encoding.TextMarshaler
func (w MultilineMatchWhichLine) MarshalText() ([]byte, error) {
s := w.String()
if s != "" {
return []byte(s), nil
}
return nil, fmt.Errorf("E! [inputs.tail] unknown multiline MatchWhichLine")
}
235 changes: 235 additions & 0 deletions plugins/inputs/tail/multiline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package tail

import (
"bytes"
"testing"
"time"

"github.com/influxdata/telegraf/internal"
"github.com/stretchr/testify/assert"
)

func TestMultilineConfigOK(t *testing.T) {
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
}

_, err := c.NewMultiline()

assert.NoError(t, err, "Configuration was OK.")
}

func TestMultilineConfigError(t *testing.T) {
c := &MultilineConfig{
Pattern: "\xA0",
MatchWhichLine: Previous,
}

_, err := c.NewMultiline()

assert.Error(t, err, "The pattern was invalid")
}

func TestMultilineConfigTimeoutSpecified(t *testing.T) {
duration, _ := time.ParseDuration("10s")
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
Timeout: &internal.Duration{Duration: duration},
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")

assert.Equal(t, duration, m.config.Timeout.Duration)
}

func TestMultilineConfigDefaultTimeout(t *testing.T) {
duration, _ := time.ParseDuration("5s")
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")

assert.Equal(t, duration, m.config.Timeout.Duration)
}

func TestMultilineIsEnabled(t *testing.T) {
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")

isEnabled := m.IsEnabled()

assert.True(t, isEnabled, "Should have been enabled")
}

func TestMultilineIsDisabled(t *testing.T) {
c := &MultilineConfig{
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")

isEnabled := m.IsEnabled()

assert.False(t, isEnabled, "Should have been disabled")
}

func TestMultilineFlushEmpty(t *testing.T) {
c := &MultilineConfig{
Pattern: "^=>",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer

text := m.Flush(&buffer)

assert.Empty(t, text)
}

func TestMultilineFlush(t *testing.T) {
c := &MultilineConfig{
Pattern: "^=>",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer
buffer.WriteString("foo")

text := m.Flush(&buffer)

assert.Equal(t, "foo", text)
assert.Zero(t, buffer.Len())
}

func TestMultiLineProcessLinePrevious(t *testing.T) {
c := &MultilineConfig{
Pattern: "^=>",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer

text := m.ProcessLine("1", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())

text = m.ProcessLine("=>2", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())

text = m.ProcessLine("=>3", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())

text = m.ProcessLine("4", &buffer)
assert.Equal(t, "1=>2=>3", text)
assert.NotZero(t, buffer.Len())

text = m.ProcessLine("5", &buffer)
assert.Equal(t, "4", text)
assert.Equal(t, "5", buffer.String())
}

func TestMultiLineProcessLineNext(t *testing.T) {
c := &MultilineConfig{
Pattern: "=>$",
MatchWhichLine: Next,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer

text := m.ProcessLine("1=>", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())

text = m.ProcessLine("2=>", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())

text = m.ProcessLine("3=>", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())

text = m.ProcessLine("4", &buffer)
assert.Equal(t, "1=>2=>3=>4", text)
assert.Zero(t, buffer.Len())

text = m.ProcessLine("5", &buffer)
assert.Equal(t, "5", text)
assert.Zero(t, buffer.Len())
}

func TestMultiLineMatchStringWithInvertMatchFalse(t *testing.T) {
c := &MultilineConfig{
Pattern: "=>$",
MatchWhichLine: Next,
InvertMatch: false,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")

matches1 := m.matchString("t=>")
matches2 := m.matchString("t")

assert.True(t, matches1)
assert.False(t, matches2)
}

func TestMultiLineMatchStringWithInvertTrue(t *testing.T) {
c := &MultilineConfig{
Pattern: "=>$",
MatchWhichLine: Next,
InvertMatch: true,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")

matches1 := m.matchString("t=>")
matches2 := m.matchString("t")

assert.False(t, matches1)
assert.True(t, matches2)
}

func TestMultilineWhat(t *testing.T) {
var w1 MultilineMatchWhichLine
w1.UnmarshalTOML([]byte(`"previous"`))
assert.Equal(t, Previous, w1)

var w2 MultilineMatchWhichLine
w2.UnmarshalTOML([]byte(`previous`))
assert.Equal(t, Previous, w2)

var w3 MultilineMatchWhichLine
w3.UnmarshalTOML([]byte(`'previous'`))
assert.Equal(t, Previous, w3)

var w4 MultilineMatchWhichLine
w4.UnmarshalTOML([]byte(`"next"`))
assert.Equal(t, Next, w4)

var w5 MultilineMatchWhichLine
w5.UnmarshalTOML([]byte(`next`))
assert.Equal(t, Next, w5)

var w6 MultilineMatchWhichLine
w6.UnmarshalTOML([]byte(`'next'`))
assert.Equal(t, Next, w6)

var w7 MultilineMatchWhichLine
err := w7.UnmarshalTOML([]byte(`nope`))
assert.Equal(t, MultilineMatchWhichLine(-1), w7)
assert.Error(t, err)
}
Loading

0 comments on commit d1a30a8

Please sign in to comment.