Skip to content

Commit

Permalink
New: Increment reception time when equal to the last one for syslog i…
Browse files Browse the repository at this point in the history
…nput plugin

Assumes monotonicity of incoming messages.
  • Loading branch information
leodido committed May 24, 2018
1 parent 6ca6e91 commit e49fa4f
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 6 deletions.
4 changes: 2 additions & 2 deletions plugins/inputs/syslog/rfc5425_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func getTestCasesForRFC5425() []testCase5425 {
"severity": "warning",
"facility": "kern",
},
Time: defaultTime,
Time: defaultTime.Add(time.Nanosecond),
},
},
wantBestEffort: []testutil.Metric{
Expand Down Expand Up @@ -135,7 +135,7 @@ func getTestCasesForRFC5425() []testCase5425 {
"severity": "warning",
"facility": "kern",
},
Time: defaultTime,
Time: defaultTime.Add(time.Nanosecond),
},
},
},
Expand Down
113 changes: 113 additions & 0 deletions plugins/inputs/syslog/rfc5426_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,116 @@ func TestBestEffort_udp(t *testing.T) {
func TestStrict_udp(t *testing.T) {
testRFC5426(t, false)
}

func TestTimeIncrement_udp(t *testing.T) {
i := 0
getNow := func() time.Time {
if i%2 == 0 {
return time.Unix(1, 0)
}
return time.Unix(1, 1)
}

// Create receiver
receiver := &Syslog{
Address: "udp://" + address,
now: getNow,
BestEffort: false,
Separator: "_",
}
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()

// Connect
conn, err := net.Dial("udp", address)
require.NotNil(t, conn)
defer conn.Close()
require.Nil(t, err)

// Write
_, e := conn.Write([]byte("<1>1 - - - - - -"))
require.Nil(t, e)

// Wait
acc.Wait(1)

want := &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: getNow(),
}

if !cmp.Equal(want, acc.Metrics[0]) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0]))
}

// New one with different time
i++

// Clear
acc.ClearMetrics()

// Write
_, e = conn.Write([]byte("<1>1 - - - - - -"))
require.Nil(t, e)

// Wait
acc.Wait(1)

want = &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: getNow(),
}

if !cmp.Equal(want, acc.Metrics[0]) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0]))
}

// New one with same time as previous one

// Clear
acc.ClearMetrics()

// Write
_, e = conn.Write([]byte("<1>1 - - - - - -"))
require.Nil(t, e)

// Wait
acc.Wait(1)

want = &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: getNow().Add(time.Nanosecond),
}

if !cmp.Equal(want, acc.Metrics[0]) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0]))
}
}
22 changes: 18 additions & 4 deletions plugins/inputs/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type Syslog struct {
BestEffort bool
Separator string `toml:"sdparam_separator"`

now func() time.Time
now func() time.Time
lastTime time.Time

mu sync.Mutex
wg sync.WaitGroup
Expand Down Expand Up @@ -217,7 +218,7 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) {

message, err := p.Parse(b[:n], &s.BestEffort)
if message != nil {
acc.AddFields("syslog", fields(*message, s), tags(*message), s.now())
acc.AddFields("syslog", fields(*message, s), tags(*message), s.time())
}
if err != nil {
acc.AddError(err)
Expand Down Expand Up @@ -317,7 +318,7 @@ func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) {
}
if res.Message != nil {
msg := *res.Message
acc.AddFields("syslog", fields(msg, s), tags(msg), s.now())
acc.AddFields("syslog", fields(msg, s), tags(msg), s.time())
}
}

Expand Down Expand Up @@ -391,10 +392,23 @@ func (uc unixCloser) Close() error {
return err
}

func (s *Syslog) time() time.Time {
t := s.now()
if t == s.lastTime {
t = t.Add(time.Nanosecond)
}
s.lastTime = t
return t
}

func getNanoNow() time.Time {
return time.Unix(0, time.Now().UnixNano())
}

func init() {
receiver := &Syslog{
Address: ":6514",
now: time.Now,
now: getNanoNow,
ReadTimeout: &internal.Duration{
Duration: defaultReadTimeout,
},
Expand Down

0 comments on commit e49fa4f

Please sign in to comment.