Skip to content

Commit

Permalink
Update: Syslog address option must contain protocol now
Browse files Browse the repository at this point in the history
  • Loading branch information
leodido committed May 24, 2018
1 parent dd6c6c0 commit 6ca6e91
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 47 deletions.
14 changes: 4 additions & 10 deletions plugins/inputs/syslog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@ This plugin listens for syslog messages following RFC5424 format. When received

```toml
[[inputs.syslog]]
## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514
## Address and port to host the syslog receiver.
## If no server is specified, then localhost is used as the host.
## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514
## Protocol, address and port to host the syslog receiver.
## If no host is specified, then localhost is used.
## If no port is specified, 6514 is used (RFC5425#section-4.1).
server = ":6514"

## Protocol (default = tcp)
## Should be one of the following values:
## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram.
## Otherwise forced to the default.
# protocol = "tcp"
server = "tcp://:6514"

## TLS Config
# tls_allowed_cacerts = ["/etc/telegraf/ca.pem"]
Expand Down
7 changes: 1 addition & 6 deletions plugins/inputs/syslog/rfc5425_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/stretchr/testify/require"
)

const (
address = ":6514"
)

var (
pki = testutil.NewPKI("../../../testutil/pki")
)
Expand Down Expand Up @@ -352,8 +348,7 @@ func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort
Duration: defaultReadTimeout,
}
s := &Syslog{
Protocol: "tcp",
Address: address,
Address: "tcp://" + address,
now: func() time.Time {
return defaultTime
},
Expand Down
6 changes: 2 additions & 4 deletions plugins/inputs/syslog/rfc5426_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ func getTestCasesForRFC5426() []testCase5426 {

func newUDPSyslogReceiver(bestEffort bool) *Syslog {
return &Syslog{
Protocol: "udp",
Address: address,
Address: "udp://" + address,
now: func() time.Time {
return defaultTime
},
Expand All @@ -220,7 +219,6 @@ func testRFC5426(t *testing.T, bestEffort bool) {
t.Run(tc.name, func(t *testing.T) {
// Create receiver
receiver := newUDPSyslogReceiver(bestEffort)
require.Equal(t, receiver.Protocol, "udp")
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
Expand All @@ -231,8 +229,8 @@ func testRFC5426(t *testing.T, bestEffort bool) {

// Connect
conn, err := net.Dial("udp", address)
defer conn.Close()
require.NotNil(t, conn)
defer conn.Close()
require.Nil(t, err)

// Write
Expand Down
74 changes: 51 additions & 23 deletions plugins/inputs/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"net/url"
"os"
"strings"
"sync"
Expand All @@ -23,9 +24,8 @@ const ipMaxPacketSize = 64 * 1024

// Syslog is a syslog plugin
type Syslog struct {
Address string `toml:"server"`
Protocol string
tlsConfig.ServerConfig
Address string `toml:"server"`
KeepAlivePeriod *internal.Duration
ReadTimeout *internal.Duration
MaxConnections int
Expand All @@ -38,7 +38,7 @@ type Syslog struct {
wg sync.WaitGroup
io.Closer

isTCP bool
isStream bool
tcpListener net.Listener
tlsConfig *tls.Config
connections map[string]net.Conn
Expand All @@ -48,17 +48,11 @@ type Syslog struct {
}

var sampleConfig = `
## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514
## Address and port to host the syslog receiver.
## If no server is specified, then localhost is used as the host.
## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514
## Protocol, address and port to host the syslog receiver.
## If no host is specified, then localhost is used.
## If no port is specified, 6514 is used (RFC5425#section-4.1).
server = ":6514"
## Protocol (default = tcp)
## Should be one of the following values:
## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram.
## Otherwise forced to the default.
# protocol = "tcp"
server = "tcp://:6514"
## TLS Config
# tls_allowed_cacerts = ["/etc/telegraf/ca.pem"]
Expand Down Expand Up @@ -112,22 +106,27 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error {
s.mu.Lock()
defer s.mu.Unlock()

switch s.Protocol {
scheme, host, err := getAddressParts(s.Address)
if err != nil {
return err
}
s.Address = host

switch scheme {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
s.isTCP = true
s.isStream = true
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
s.isTCP = false
s.isStream = false
default:
s.Protocol = "tcp"
s.isTCP = true
return fmt.Errorf("unknown protocol '%s' in '%s'", scheme, s.Address)
}

if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" {
if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" {
os.Remove(s.Address)
}

if s.isTCP {
l, err := net.Listen(s.Protocol, s.Address)
if s.isStream {
l, err := net.Listen(scheme, s.Address)
if err != nil {
return err
}
Expand All @@ -141,7 +140,7 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error {
s.wg.Add(1)
go s.listenStream(acc)
} else {
l, err := net.ListenPacket(s.Protocol, s.Address)
l, err := net.ListenPacket(scheme, s.Address)
if err != nil {
return err
}
Expand All @@ -152,7 +151,7 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error {
go s.listenPacket(acc)
}

if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" {
if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" {
s.Closer = unixCloser{path: s.Address, closer: s.Closer}
}

Expand All @@ -170,6 +169,35 @@ func (s *Syslog) Stop() {
s.wg.Wait()
}

// getAddressParts returns the address scheme and host
// it also sets defaults for them when missing
// when the input address does not specify the protocol it returns an error
func getAddressParts(a string) (string, string, error) {
parts := strings.SplitN(a, "://", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("missing protocol within address '%s'", a)
}

u, _ := url.Parse(a)
switch u.Scheme {
case "unix", "unixpacket", "unixgram":
return parts[0], parts[1], nil
}

var host string
if u.Hostname() != "" {
host = u.Hostname()
}
host += ":"
if u.Port() == "" {
host += "6514"
} else {
host += u.Port()
}

return u.Scheme, host, nil
}

func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
defer s.wg.Done()
b := make([]byte, ipMaxPacketSize)
Expand Down
41 changes: 37 additions & 4 deletions plugins/inputs/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/stretchr/testify/require"
)

const (
address = ":6514"
)

var defaultTime = time.Unix(0, 0)
var maxP = uint8(191)
var maxV = uint16(999)
Expand All @@ -19,9 +23,38 @@ var maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc
var maxMID = "abcdefghilmnopqrstuvzabcdefghilm"
var message7681 = strings.Repeat("l", 7681)

func TestListenError(t *testing.T) {
receiver := &Syslog{
Address: "wrong address",
func TestAddress(t *testing.T) {
var err error
var rec *Syslog

rec = &Syslog{
Address: "localhost:6514",
}
err = rec.Start(&testutil.Accumulator{})
require.EqualError(t, err, "missing protocol within address 'localhost:6514'")
require.Error(t, err)

rec = &Syslog{
Address: "unsupported://example.com:6514",
}
err = rec.Start(&testutil.Accumulator{})
require.EqualError(t, err, "unknown protocol 'unsupported' in 'example.com:6514'")
require.Error(t, err)

rec = &Syslog{
Address: "unixgram:///tmp/telegraf.sock",
}
err = rec.Start(&testutil.Accumulator{})
require.NoError(t, err)
require.Equal(t, "/tmp/telegraf.sock", rec.Address)
rec.Stop()

// Default port is 6514
rec = &Syslog{
Address: "tcp://localhost",
}
require.Error(t, receiver.Start(&testutil.Accumulator{}))
err = rec.Start(&testutil.Accumulator{})
require.NoError(t, err)
require.Equal(t, "localhost:6514", rec.Address)
rec.Stop()
}

0 comments on commit 6ca6e91

Please sign in to comment.