forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Semigroupoid multiline (influxdata#8167) (influxdata#8190)
Co-authored-by: javicrespo <[email protected]> Co-authored-by: jcrespo <[email protected]> Co-authored-by: semigroupoid <[email protected]> (cherry picked from commit 382dac7)
- Loading branch information
Showing
7 changed files
with
980 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package tail | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"regexp" | ||
"strings" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf/internal" | ||
) | ||
|
||
// Indicates relation to the multiline event: previous or next | ||
type MultilineMatchWhichLine int | ||
|
||
type Multiline struct { | ||
config *MultilineConfig | ||
enabled bool | ||
patternRegexp *regexp.Regexp | ||
} | ||
|
||
type MultilineConfig struct { | ||
Pattern string | ||
MatchWhichLine MultilineMatchWhichLine `toml:"match_which_line"` | ||
InvertMatch bool | ||
Timeout *internal.Duration | ||
} | ||
|
||
const ( | ||
// Previous => Append current line to previous line | ||
Previous MultilineMatchWhichLine = iota | ||
// Next => Next line will be appended to current line | ||
Next | ||
) | ||
|
||
func (m *MultilineConfig) NewMultiline() (*Multiline, error) { | ||
enabled := false | ||
var r *regexp.Regexp | ||
var err error | ||
|
||
if m.Pattern != "" { | ||
enabled = true | ||
if r, err = regexp.Compile(m.Pattern); err != nil { | ||
return nil, err | ||
} | ||
if m.Timeout == nil || m.Timeout.Duration.Nanoseconds() == int64(0) { | ||
m.Timeout = &internal.Duration{Duration: 5 * time.Second} | ||
} | ||
} | ||
|
||
return &Multiline{ | ||
config: m, | ||
enabled: enabled, | ||
patternRegexp: r}, nil | ||
} | ||
|
||
func (m *Multiline) IsEnabled() bool { | ||
return m.enabled | ||
} | ||
|
||
func (m *Multiline) ProcessLine(text string, buffer *bytes.Buffer) string { | ||
if m.matchString(text) { | ||
buffer.WriteString(text) | ||
return "" | ||
} | ||
|
||
if m.config.MatchWhichLine == Previous { | ||
previousText := buffer.String() | ||
buffer.Reset() | ||
buffer.WriteString(text) | ||
text = previousText | ||
} else { | ||
// Next | ||
if buffer.Len() > 0 { | ||
buffer.WriteString(text) | ||
text = buffer.String() | ||
buffer.Reset() | ||
} | ||
} | ||
|
||
return text | ||
} | ||
|
||
func (m *Multiline) Flush(buffer *bytes.Buffer) string { | ||
if buffer.Len() == 0 { | ||
return "" | ||
} | ||
text := buffer.String() | ||
buffer.Reset() | ||
return text | ||
} | ||
|
||
func (m *Multiline) matchString(text string) bool { | ||
return m.patternRegexp.MatchString(text) != m.config.InvertMatch | ||
} | ||
|
||
func (w MultilineMatchWhichLine) String() string { | ||
switch w { | ||
case Previous: | ||
return "previous" | ||
case Next: | ||
return "next" | ||
} | ||
return "" | ||
} | ||
|
||
// UnmarshalTOML implements ability to unmarshal MultilineMatchWhichLine from TOML files. | ||
func (w *MultilineMatchWhichLine) UnmarshalTOML(data []byte) (err error) { | ||
return w.UnmarshalText(data) | ||
} | ||
|
||
// UnmarshalText implements encoding.TextUnmarshaler | ||
func (w *MultilineMatchWhichLine) UnmarshalText(data []byte) (err error) { | ||
s := string(data) | ||
switch strings.ToUpper(s) { | ||
case `PREVIOUS`, `"PREVIOUS"`, `'PREVIOUS'`: | ||
*w = Previous | ||
return | ||
|
||
case `NEXT`, `"NEXT"`, `'NEXT'`: | ||
*w = Next | ||
return | ||
} | ||
*w = -1 | ||
return fmt.Errorf("E! [inputs.tail] unknown multiline MatchWhichLine") | ||
} | ||
|
||
// MarshalText implements encoding.TextMarshaler | ||
func (w MultilineMatchWhichLine) MarshalText() ([]byte, error) { | ||
s := w.String() | ||
if s != "" { | ||
return []byte(s), nil | ||
} | ||
return nil, fmt.Errorf("E! [inputs.tail] unknown multiline MatchWhichLine") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
package tail | ||
|
||
import ( | ||
"bytes" | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf/internal" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestMultilineConfigOK(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: ".*", | ||
MatchWhichLine: Previous, | ||
} | ||
|
||
_, err := c.NewMultiline() | ||
|
||
assert.NoError(t, err, "Configuration was OK.") | ||
} | ||
|
||
func TestMultilineConfigError(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: "\xA0", | ||
MatchWhichLine: Previous, | ||
} | ||
|
||
_, err := c.NewMultiline() | ||
|
||
assert.Error(t, err, "The pattern was invalid") | ||
} | ||
|
||
func TestMultilineConfigTimeoutSpecified(t *testing.T) { | ||
duration, _ := time.ParseDuration("10s") | ||
c := &MultilineConfig{ | ||
Pattern: ".*", | ||
MatchWhichLine: Previous, | ||
Timeout: &internal.Duration{Duration: duration}, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
|
||
assert.Equal(t, duration, m.config.Timeout.Duration) | ||
} | ||
|
||
func TestMultilineConfigDefaultTimeout(t *testing.T) { | ||
duration, _ := time.ParseDuration("5s") | ||
c := &MultilineConfig{ | ||
Pattern: ".*", | ||
MatchWhichLine: Previous, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
|
||
assert.Equal(t, duration, m.config.Timeout.Duration) | ||
} | ||
|
||
func TestMultilineIsEnabled(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: ".*", | ||
MatchWhichLine: Previous, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
|
||
isEnabled := m.IsEnabled() | ||
|
||
assert.True(t, isEnabled, "Should have been enabled") | ||
} | ||
|
||
func TestMultilineIsDisabled(t *testing.T) { | ||
c := &MultilineConfig{ | ||
MatchWhichLine: Previous, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
|
||
isEnabled := m.IsEnabled() | ||
|
||
assert.False(t, isEnabled, "Should have been disabled") | ||
} | ||
|
||
func TestMultilineFlushEmpty(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: "^=>", | ||
MatchWhichLine: Previous, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
var buffer bytes.Buffer | ||
|
||
text := m.Flush(&buffer) | ||
|
||
assert.Empty(t, text) | ||
} | ||
|
||
func TestMultilineFlush(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: "^=>", | ||
MatchWhichLine: Previous, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
var buffer bytes.Buffer | ||
buffer.WriteString("foo") | ||
|
||
text := m.Flush(&buffer) | ||
|
||
assert.Equal(t, "foo", text) | ||
assert.Zero(t, buffer.Len()) | ||
} | ||
|
||
func TestMultiLineProcessLinePrevious(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: "^=>", | ||
MatchWhichLine: Previous, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
var buffer bytes.Buffer | ||
|
||
text := m.ProcessLine("1", &buffer) | ||
assert.Empty(t, text) | ||
assert.NotZero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("=>2", &buffer) | ||
assert.Empty(t, text) | ||
assert.NotZero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("=>3", &buffer) | ||
assert.Empty(t, text) | ||
assert.NotZero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("4", &buffer) | ||
assert.Equal(t, "1=>2=>3", text) | ||
assert.NotZero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("5", &buffer) | ||
assert.Equal(t, "4", text) | ||
assert.Equal(t, "5", buffer.String()) | ||
} | ||
|
||
func TestMultiLineProcessLineNext(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: "=>$", | ||
MatchWhichLine: Next, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
var buffer bytes.Buffer | ||
|
||
text := m.ProcessLine("1=>", &buffer) | ||
assert.Empty(t, text) | ||
assert.NotZero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("2=>", &buffer) | ||
assert.Empty(t, text) | ||
assert.NotZero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("3=>", &buffer) | ||
assert.Empty(t, text) | ||
assert.NotZero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("4", &buffer) | ||
assert.Equal(t, "1=>2=>3=>4", text) | ||
assert.Zero(t, buffer.Len()) | ||
|
||
text = m.ProcessLine("5", &buffer) | ||
assert.Equal(t, "5", text) | ||
assert.Zero(t, buffer.Len()) | ||
} | ||
|
||
func TestMultiLineMatchStringWithInvertMatchFalse(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: "=>$", | ||
MatchWhichLine: Next, | ||
InvertMatch: false, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
|
||
matches1 := m.matchString("t=>") | ||
matches2 := m.matchString("t") | ||
|
||
assert.True(t, matches1) | ||
assert.False(t, matches2) | ||
} | ||
|
||
func TestMultiLineMatchStringWithInvertTrue(t *testing.T) { | ||
c := &MultilineConfig{ | ||
Pattern: "=>$", | ||
MatchWhichLine: Next, | ||
InvertMatch: true, | ||
} | ||
m, err := c.NewMultiline() | ||
assert.NoError(t, err, "Configuration was OK.") | ||
|
||
matches1 := m.matchString("t=>") | ||
matches2 := m.matchString("t") | ||
|
||
assert.False(t, matches1) | ||
assert.True(t, matches2) | ||
} | ||
|
||
func TestMultilineWhat(t *testing.T) { | ||
var w1 MultilineMatchWhichLine | ||
w1.UnmarshalTOML([]byte(`"previous"`)) | ||
assert.Equal(t, Previous, w1) | ||
|
||
var w2 MultilineMatchWhichLine | ||
w2.UnmarshalTOML([]byte(`previous`)) | ||
assert.Equal(t, Previous, w2) | ||
|
||
var w3 MultilineMatchWhichLine | ||
w3.UnmarshalTOML([]byte(`'previous'`)) | ||
assert.Equal(t, Previous, w3) | ||
|
||
var w4 MultilineMatchWhichLine | ||
w4.UnmarshalTOML([]byte(`"next"`)) | ||
assert.Equal(t, Next, w4) | ||
|
||
var w5 MultilineMatchWhichLine | ||
w5.UnmarshalTOML([]byte(`next`)) | ||
assert.Equal(t, Next, w5) | ||
|
||
var w6 MultilineMatchWhichLine | ||
w6.UnmarshalTOML([]byte(`'next'`)) | ||
assert.Equal(t, Next, w6) | ||
|
||
var w7 MultilineMatchWhichLine | ||
err := w7.UnmarshalTOML([]byte(`nope`)) | ||
assert.Equal(t, MultilineMatchWhichLine(-1), w7) | ||
assert.Error(t, err) | ||
} |
Oops, something went wrong.