Skip to content

Commit

Permalink
Improve syslog connection handling
Browse files Browse the repository at this point in the history
Resolves #4335
  • Loading branch information
glinton committed Jul 2, 2018
1 parent 73e2e6a commit 839ca60
Showing 1 changed file with 46 additions and 12 deletions.
58 changes: 46 additions & 12 deletions plugins/inputs/syslog/syslog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package syslog

import (
"bytes"
"crypto/tls"
"fmt"
"io"
Expand Down Expand Up @@ -279,20 +280,53 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
conn.Close()
}()

if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}
zero := time.Time{}
for {
// make a temporary bytes var to read from the connection
tmp := make([]byte, 128)
// make 0 length data bytes (since we'll be appending)
data := make([]byte, 0)

// loop through the connection stream, appending tmp to data
for {
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}

var p *rfc5425.Parser
if s.BestEffort {
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(conn)
}
// read to the tmp var
n, err := conn.Read(tmp)
if err != nil {
// Ignore known/recoverable errors. In contrived tests:
// * i/o timeout error - no data to Read() before s.ReadTimeout.Duration expired
// * EOF error - connection open/close immediately
if er, ok := err.(net.Error); err != io.EOF && (ok && !er.Timeout()) {
s.store(rfc5425.Result{Error: fmt.Errorf("Failed reading from syslog client - %s", err.Error())}, acc)
}
return
}

// append read data to full data
data = append(data, tmp[:n]...)

p.ParseExecuting(func(r *rfc5425.Result) {
s.store(*r, acc)
})
// break if ends with '\n' (todo: need to ensure writing w/o "\n" works)
if tmp[n-1] == '\n' { //|| tmp[n-1] == 'EOF' {
break
}
}

conn.SetReadDeadline(zero)

var p *rfc5425.Parser
if s.BestEffort {
p = rfc5425.NewParser(bytes.NewReader(data), rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(bytes.NewReader(data))
}

p.ParseExecuting(func(r *rfc5425.Result) {
s.store(*r, acc)
})
}
}

func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
Expand Down

0 comments on commit 839ca60

Please sign in to comment.