From c1b7de84b1e59d4d42550342f03d377895e07157 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Tue, 15 May 2018 18:22:00 +0100 Subject: [PATCH 01/16] New: Syslog input plugin Introducing an input service plugin acting like a syslog receiver as specified by RFC5425 (TLS/TCP) or RFC5426 (UDP). --- plugins/inputs/syslog/syslog.go | 378 ++++++++++++++++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 plugins/inputs/syslog/syslog.go diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go new file mode 100644 index 0000000000000..c6e44c0292a2e --- /dev/null +++ b/plugins/inputs/syslog/syslog.go @@ -0,0 +1,378 @@ +package syslog + +import ( + "crypto/tls" + "fmt" + "io" + "log" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/go-syslog/rfc5424" + "github.com/influxdata/go-syslog/rfc5425" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + tlsConfig "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const defaultReadTimeout = time.Millisecond * 500 +const ipMaxPacketSize = 64 * 1024 + +// Syslog is a syslog plugin +type Syslog struct { + Address string `toml:"server"` + Protocol string + tlsConfig.ServerConfig + KeepAlivePeriod *internal.Duration + ReadTimeout *internal.Duration + MaxConnections int + BestEffort bool + + now func() time.Time + + mu sync.Mutex + wg sync.WaitGroup + io.Closer + + isTCP bool + tcpListener net.Listener + tlsConfig *tls.Config + connections map[string]net.Conn + connectionsMu sync.Mutex + + udpListener net.PacketConn +} + +var sampleConfig = ` + ## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514 + ## Address and port to host the syslog receiver. + ## If no server is specified, then localhost is used as the host. + ## If no port is specified, 6514 is used (RFC5425#section-4.1). + server = ":6514" + + ## Protocol (default = tcp) + ## Should be one of the following values: + ## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram. + ## Otherwise forced to the default. + # protocol = "tcp" + + ## TLS Config + # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Period between keep alive probes. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + ## Only applies to stream sockets (e.g. TCP). + # keep_alive_period = "5m" + + ## Maximum number of concurrent connections (default = 0). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + # max_connections = 1024 + + ## Read timeout (default = 500ms). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + read_timeout = 500ms + + ## Whether to parse in best effort mode or not (default = false). + ## By default best effort parsing is off. + # best_effort = false +` + +// SampleConfig returns sample configuration message +func (s *Syslog) SampleConfig() string { + return sampleConfig +} + +// Description returns the plugin description +func (s *Syslog) Description() string { + return "Influx syslog receiver as per RFC5425" +} + +// Gather ... +func (s *Syslog) Gather(_ telegraf.Accumulator) error { + return nil +} + +// Start starts the service. +func (s *Syslog) Start(acc telegraf.Accumulator) error { + s.mu.Lock() + defer s.mu.Unlock() + + // tags := map[string]string{ + // "address": s.Address, + // } + + switch s.Protocol { + case "tcp", "tcp4", "tcp6", "unix", "unixpacket": + s.isTCP = true + case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": + s.isTCP = false + default: + s.Protocol = "tcp" + s.isTCP = true + } + + if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" { + os.Remove(s.Address) + } + + if s.isTCP { + l, err := net.Listen(s.Protocol, s.Address) + if err != nil { + return err + } + s.Closer = l + s.tcpListener = l + if tlsConfig, _ := s.TLSConfig(); tlsConfig != nil { + s.tlsConfig = tlsConfig + } + + s.wg.Add(1) + go s.listenStream(acc) + } else { + l, err := net.ListenPacket(s.Protocol, s.Address) + if err != nil { + return err + } + s.Closer = l + s.udpListener = l + + s.wg.Add(1) + go s.listenPacket(acc) + } + + if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" { + s.Closer = unixCloser{path: s.Address, closer: s.Closer} + } + + log.Printf("I! Started syslog receiver at %s\n", s.Address) + return nil +} + +// Stop cleans up all resources +func (s *Syslog) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.Closer != nil { + s.Close() + } + s.wg.Wait() + + log.Printf("I! Stopped syslog receiver at %s\n", s.Address) +} + +func (s *Syslog) listenPacket(acc telegraf.Accumulator) { + defer s.wg.Done() + b := make([]byte, ipMaxPacketSize) + for { + n, _, err := s.udpListener.ReadFrom(b) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + log.Println(err) + acc.AddError(err) + } + break + } + + if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { + s.udpListener.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) + } + + p := rfc5424.NewParser() + mex, err := p.Parse(b[:n], &s.BestEffort) + if mex != nil { + acc.AddFields("syslog", fields(mex), tags(mex), s.now()) + } + if err != nil { + acc.AddError(err) + } + } +} + +func (s *Syslog) listenStream(acc telegraf.Accumulator) { + defer s.wg.Done() + + s.connections = map[string]net.Conn{} + + for { + conn, err := s.tcpListener.Accept() + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + log.Println(err) + acc.AddError(err) + } + break + } + var tcpConn, _ = conn.(*net.TCPConn) + if s.tlsConfig != nil { + conn = tls.Server(conn, s.tlsConfig) + } + + s.connectionsMu.Lock() + if s.MaxConnections > 0 && len(s.connections) >= s.MaxConnections { + s.connectionsMu.Unlock() + conn.Close() + continue + } + s.connections[conn.RemoteAddr().String()] = conn + s.connectionsMu.Unlock() + + if err := s.setKeepAlive(tcpConn); err != nil { + acc.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", s.Address, err)) + } + + go s.handle(conn, acc) + } + + s.connectionsMu.Lock() + for _, c := range s.connections { + c.Close() + } + s.connectionsMu.Unlock() +} + +func (s *Syslog) removeConnection(c net.Conn) { + s.connectionsMu.Lock() + delete(s.connections, c.RemoteAddr().String()) + s.connectionsMu.Unlock() +} + +func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { + defer s.removeConnection(conn) + defer conn.Close() + + if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { + conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) + } + + var p *rfc5425.Parser + if s.BestEffort { + p = rfc5425.NewParser(conn, rfc5425.WithBestEffort()) + } else { + p = rfc5425.NewParser(conn) + } + + p.ParseExecuting(func(r *rfc5425.Result) { + s.store(*r, acc) + }) +} + +func (s *Syslog) setKeepAlive(c *net.TCPConn) error { + if s.KeepAlivePeriod == nil { + return nil + } + + if s.KeepAlivePeriod.Duration == 0 { + return c.SetKeepAlive(false) + } + if err := c.SetKeepAlive(true); err != nil { + return err + } + return c.SetKeepAlivePeriod(s.KeepAlivePeriod.Duration) +} + +func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) { + if res.Error != nil { + acc.AddError(res.Error) + } + if res.MessageError != nil { + acc.AddError(res.MessageError) + } + if res.Message != nil { + acc.AddFields("syslog", fields(res.Message), tags(res.Message), s.now()) + } +} + +func tags(msg *rfc5424.SyslogMessage) map[string]string { + ts := map[string]string{} + if lvl := msg.SeverityLevel(); lvl != nil { + ts["severity"] = strconv.Itoa(int(*msg.Severity())) + ts["severity_level"] = *lvl + } + + if f := msg.FacilityMessage(); f != nil { + ts["facility"] = strconv.Itoa(int(*msg.Facility())) + ts["facility_message"] = *f + } + + if msg.Hostname() != nil { + ts["hostname"] = *msg.Hostname() + } + + if msg.Appname() != nil { + ts["appname"] = *msg.Appname() + } + + return ts +} + +func fields(msg *rfc5424.SyslogMessage) map[string]interface{} { + flds := map[string]interface{}{ + "version": msg.Version(), + } + + if msg.Timestamp() != nil { + flds["timestamp"] = *msg.Timestamp() + } + + if msg.ProcID() != nil { + flds["procid"] = *msg.ProcID() + } + + if msg.MsgID() != nil { + flds["msgid"] = *msg.MsgID() + } + + if msg.Message() != nil { + flds["message"] = *msg.Message() + } + + if msg.StructuredData() != nil { + for sdid, sdparams := range *msg.StructuredData() { + if len(sdparams) == 0 { + // When SD-ID does not have params we indicate its presence with a bool + flds[sdid] = true + continue + } + for name, value := range sdparams { + // Using whitespace as separator since it is not allowed by the grammar within SDID + flds[sdid+" "+name] = value + } + } + } + + return flds +} + +type unixCloser struct { + path string + closer io.Closer +} + +func (uc unixCloser) Close() error { + err := uc.closer.Close() + os.Remove(uc.path) // ignore error + return err +} + +func init() { + receiver := &Syslog{ + Address: ":6514", + now: time.Now, + ReadTimeout: &internal.Duration{ + Duration: defaultReadTimeout, + }, + } + + inputs.Add("syslog", func() telegraf.Input { return receiver }) +} From c802954b909160575133e9f1e8abd589e57177b6 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Wed, 16 May 2018 19:02:00 +0100 Subject: [PATCH 02/16] New: Setup test suite utilities for syslog input plugin --- plugins/inputs/syslog/syslog_test.go | 67 ++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 plugins/inputs/syslog/syslog_test.go diff --git a/plugins/inputs/syslog/syslog_test.go b/plugins/inputs/syslog/syslog_test.go new file mode 100644 index 0000000000000..3756691804565 --- /dev/null +++ b/plugins/inputs/syslog/syslog_test.go @@ -0,0 +1,67 @@ +package syslog + +import ( + "math/rand" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var defaultTime = time.Unix(0, 0) + +var ( + maxP uint8 + maxV uint16 + maxTS string + maxH string + maxA string + maxPID string + maxMID string + message7681 string +) + +func TestListenError(t *testing.T) { + receiver := &Syslog{ + Address: "wrong address", + } + require.Error(t, receiver.Start(&testutil.Accumulator{})) +} + +func getRandomString(n int) string { + const ( + letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = src.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(letterBytes) { + b[i] = letterBytes[idx] + i-- + } + cache >>= letterIdxBits + remain-- + } + + return string(b) +} + +func init() { + maxP = uint8(191) + maxV = uint16(999) + maxTS = "2017-12-31T23:59:59.999999+00:00" + maxH = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc" + maxA = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdef" + maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzab" + maxMID = "abcdefghilmnopqrstuvzabcdefghilm" + message7681 = getRandomString(7681) +} From 1507ea4fcc112a94ec8b9abadc3fbd6fe283af43 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Thu, 17 May 2018 15:42:00 +0100 Subject: [PATCH 03/16] Docs: README of syslog input plugin --- plugins/inputs/syslog/README.md | 110 ++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 plugins/inputs/syslog/README.md diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md new file mode 100644 index 0000000000000..5944039601763 --- /dev/null +++ b/plugins/inputs/syslog/README.md @@ -0,0 +1,110 @@ +# syslog input plugin + +Collects syslog messages as per RFC5425 or RFC5426. + +It can act as a syslog transport receiver over TLS (or TCP) - ie., RFC5425 - or over UDP - ie., RFC5426. + +This plugin listens for syslog messages following RFC5424 format. When received it parses them extracting metrics. + +### Configuration: + +#### TCP + +The minimal configuration is the following: + +```toml +[[inputs.syslog]] + address = ":6514" +``` + +This starts this plugins as a syslog receiver over TCP protocol on port 6514. + +#### TLS + +To configure it as a TLS syslog receiver as recommended by RFC5425 give it the following configuration: + +```toml +[[inputs.syslog]] + address = ":6514" + tls_cacert = "/etc/telegraf/ca.pem" + tls_cert = "/etc/telegraf/cert.pem" + tls_key = "/etc/telegraf/key.pem" +``` + +#### UDP + +To configure this plugin as per RFC5426 give it the following configuration: + +```toml +[[inputs.syslog]] + protocol = "udp" + address = ":6514" +``` + +#### Other configs + +Other available configurations are: + +- `keep_alive_period`, `max_connections` for stream sockets +- `best_effort` to tell the parser to work until it is able to do and extract partial but valid info + +### Metrics + +- syslog + - fields + - **version** (`uint16`) + - timestamp (`time.Time`) + - procid (`string`) + - msgid (`string`) + - _structureddata element id_ (`bool`) + - _structureddata element parameter name_ (`string`) + - tags + - **severity** (`string`) + - **severity_level** (`string`) + - **facility** (`string`) + - **facility_message** (`string`) + - hostname (`string`) + - appname (`string`) + +The name of fields in _italic_ corresponds to their runtime value. + +The fields/tags which name is in **bold** will always be present when a valid Syslog message has been received. + +### Syslog transport sender + +The following instructions illustrate how to configure a syslog transport sender as per RFC5425 - ie., using the octect framing technique. + +Install `rsyslog`. + +Give it a configuration - ie., `/etc/rsyslog.conf`. + +``` +$ModLoad imuxsock # provides support for local system logging +$ModLoad imklog # provides kernel logging support +$ModLoad immark # provides heart-beat logs +$FileOwner root +$FileGroup root +$FileCreateMode 0640 +$DirCreateMode 0755 +$Umask 0022 +$WorkDirectory /var/spool/rsyslog # default location for work (spool) files +$ActionQueueType LinkedList # use asynchronous processing +$ActionQueueFileName srvrfwd # set file name, also enables disk mode +$ActionResumeRetryCount -1 # infinite retries on insert failure +$ActionQueueSaveOnShutdown on # save in-memory data if rsyslog shuts down +$IncludeConfig /etc/rsyslog.d/*.conf +``` + +Specify you want the octet framing technique enabled and the format of each syslog message to follow the RFC5424. + +Create a file - eg., `/etc/rsyslog.d/50-default.conf` - containing: + +``` +*.* @@(o)127.0.0.1:6514;RSYSLOG_SyslogProtocol23Format +``` + +To complete the TLS setup please refer to [rsyslog docs](https://www.rsyslog.com/doc/v8-stable/tutorials/tls.html). + +Notice that this configuration tells `rsyslog` to broadcast messages to `127.0.0.1>6514`. + +So you have to configure this plugin accordingly. \ No newline at end of file From f17171e7404f83f47bf6c8bfee5fb60587f433cd Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Sat, 19 May 2018 12:42:00 +0100 Subject: [PATCH 04/16] Update: Testing syslog input plugin listening on TLS/TCP --- plugins/inputs/syslog/rfc5425_test.go | 657 ++++++++++++++++++++++++++ 1 file changed, 657 insertions(+) create mode 100644 plugins/inputs/syslog/rfc5425_test.go diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go new file mode 100644 index 0000000000000..761f3817492d3 --- /dev/null +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -0,0 +1,657 @@ +package syslog + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +const ( + serviceRootPEM = `-----BEGIN CERTIFICATE----- +MIIBxzCCATCgAwIBAgIJAJb7HqN2BzWWMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV +BAMMC1RlbGVncmFmIENBMB4XDTE3MTEwNDA0MzEwN1oXDTI3MTEwMjA0MzEwN1ow +FjEUMBIGA1UEAwwLVGVsZWdyYWYgQ0EwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJ +AoGBANbkUkK6JQC3rbLcXhLJTS9SX6uXyFwl7bUfpAN5Hm5EqfvG3PnLrogfTGLr +Tq5CRAu/gbbdcMoL9TLv/aaDVnrpV0FslKhqYmkOgT28bdmA7Qtr539aQpMKCfcW +WCnoMcBD5u5h9MsRqpdq+0Mjlsf1H2hSf07jHk5R1T4l8RMXAgMBAAGjHTAbMAwG +A1UdEwQFMAMBAf8wCwYDVR0PBAQDAgEGMA0GCSqGSIb3DQEBCwUAA4GBANSrwvpU +t8ihIhpHqgJZ34DM92CZZ3ZHmH/KyqlnuGzjjpnVZiXVrLDTOzrA0ziVhmefY29w +roHjENbFm54HW97ogxeURuO8HRHIVh2U0rkyVxOfGZiUdINHqsZdSnDY07bzCtSr +Z/KsfWXM5llD1Ig1FyBHpKjyUvfzr73sjm/4 +-----END CERTIFICATE-----` + serviceCertPEM = `-----BEGIN CERTIFICATE----- +MIIBzzCCATigAwIBAgIBATANBgkqhkiG9w0BAQsFADAWMRQwEgYDVQQDDAtUZWxl +Z3JhZiBDQTAeFw0xNzExMDQwNDMxMDdaFw0yNzExMDIwNDMxMDdaMBQxEjAQBgNV +BAMMCWxvY2FsaG9zdDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAsJRss1af +XKrcIjQoAp2kdJIpT2Ya+MRQXJ18b0PP7szh2lisY11kd/HCkd4D4efuIkpszHaN +xwyTOZLOoplxp6fizzgOYjXsJ6SzbO1MQNmq8Ch/+uKiGgFwLX+YxOOsGSDIHNhF +vcBi93cQtCWPBFz6QRQf9yfIAA5KKxUfJcMCAwEAAaMvMC0wCQYDVR0TBAIwADAL +BgNVHQ8EBAMCBSAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDQYJKoZIhvcNAQELBQAD +gYEAiC3WI4y9vfYz53gw7FKnNK7BBdwRc43x7Pd+5J/cclWyUZPdmcj1UNmv/3rj +2qcMmX06UdgPoHppzNAJePvMVk0vjMBUe9MmYlafMz0h4ma/it5iuldXwmejFcdL +6wWQp7gVTileCEmq9sNvfQN1FmT3EWf4IMdO2MNat/1If0g= +-----END CERTIFICATE-----` + serviceKeyPEM = `-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQCwlGyzVp9cqtwiNCgCnaR0kilPZhr4xFBcnXxvQ8/uzOHaWKxj +XWR38cKR3gPh5+4iSmzMdo3HDJM5ks6imXGnp+LPOA5iNewnpLNs7UxA2arwKH/6 +4qIaAXAtf5jE46wZIMgc2EW9wGL3dxC0JY8EXPpBFB/3J8gADkorFR8lwwIDAQAB +AoGBAJaFHxfMmjHK77U0UnrQWFSKFy64cftmlL4t/Nl3q7L68PdIKULWZIMeEWZ4 +I0UZiFOwr4em83oejQ1ByGSwekEuiWaKUI85IaHfcbt+ogp9hY/XbOEo56OPQUAd +bEZv1JqJOqta9Ug1/E1P9LjEEyZ5F5ubx7813rxAE31qKtKJAkEA1zaMlCWIr+Rj +hGvzv5rlHH3wbOB4kQFXO4nqj3J/ttzR5QiJW24STMDcbNngFlVcDVju56LrNTiD +dPh9qvl7nwJBANILguR4u33OMksEZTYB7nQZSurqXsq6382zH7pTl29ANQTROHaM +PKC8dnDWq8RGTqKuvWblIzzGIKqIMovZo10CQC96T0UXirITFolOL3XjvAuvFO1Q +EAkdXJs77805m0dCK+P1IChVfiAEpBw3bKJArpAbQIlFfdI953JUp5SieU0CQEub +BSSEKMjh/cxu6peEHnb/262vayuCFKkQPu1sxWewLuVrAe36EKCy9dcsDmv5+rgo +Odjdxc9Madm4aKlaT6kCQQCpAgeblDrrxTrNQ+Typzo37PlnQrvI+0EceAUuJ72G +P0a+YZUeHNRqT2pPN9lMTAZGGi3CtcF2XScbLNEBeXge +-----END RSA PRIVATE KEY-----` + clientRootPEM = serviceRootPEM + clientCertPEM = `-----BEGIN CERTIFICATE----- +MIIBzjCCATegAwIBAgIBAjANBgkqhkiG9w0BAQsFADAWMRQwEgYDVQQDDAtUZWxl +Z3JhZiBDQTAeFw0xNzExMDQwNDMxMDdaFw0yNzExMDIwNDMxMDdaMBMxETAPBgNV +BAMMCHRlbGVncmFmMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDP2IMqyOqI +sJjwBprrz8WPzmlrpyYikQ4XSCSJB3DSTIO+igqMpBUTj3vLlOzsHfVVot1WRqc6 +3esM4JE92rc6S73xi4g8L/r8cPIHW4hvFJdMti4UkJBWim8ArSbFqnZjcR19G3tG +LUOiXAUG3nWzMzoEsPruvV1dkKRbJVE4MwIDAQABoy8wLTAJBgNVHRMEAjAAMAsG +A1UdDwQEAwIHgDATBgNVHSUEDDAKBggrBgEFBQcDAjANBgkqhkiG9w0BAQsFAAOB +gQCHxMk38XNxL9nPFBYo3JqITJCFswu6/NLHwDBXCuZKl53rUuFWduiO+1OuScKQ +sQ79W0jHsWRKGOUFrF5/Gdnh8AlkVaITVlcmhdAOFCEbeGpeEvLuuK6grckPitxy +bRF5oM4TCLKKAha60Ir41rk2bomZM9+NZu+Bm+csDqCoxQ== +-----END CERTIFICATE-----` + clientKeyPEM = `-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQDP2IMqyOqIsJjwBprrz8WPzmlrpyYikQ4XSCSJB3DSTIO+igqM +pBUTj3vLlOzsHfVVot1WRqc63esM4JE92rc6S73xi4g8L/r8cPIHW4hvFJdMti4U +kJBWim8ArSbFqnZjcR19G3tGLUOiXAUG3nWzMzoEsPruvV1dkKRbJVE4MwIDAQAB +AoGAFzb/r4+xYoMXEfgq5ZvXXTCY5cVNpR6+jCsqqYODPnn9XRLeCsdo8z5bfWms +7NKLzHzca/6IPzL6Rf3vOxFq1YyIZfYVHH+d63/9blAm3Iajjp1W2yW5aj9BJjTb +nm6F0RfuW/SjrZ9IXxTZhSpCklPmUzVZpzvwV3KGeVTVCEECQQDoavCeOwLuqDpt +0aM9GMFUpOU7kLPDuicSwCDaTae4kN2rS17Zki41YXe8A8+509IEN7mK09Vq9HxY +SX6EmV1FAkEA5O9QcCHEa8P12EmUC8oqD2bjq6o7JjUIRlKinwZTlooMJYZw98gA +FVSngTUvLVCVIvSdjldXPOGgfYiccTZrFwJAfHS3gKOtAEuJbkEyHodhD4h1UB4+ +hPLr9Xh4ny2yQH0ilpV3px5GLEOTMFUCKUoqTiPg8VxaDjn5U/WXED5n2QJAR4J1 +NsFlcGACj+/TvacFYlA6N2nyFeokzoqLX28Ddxdh2erXqJ4hYIhT1ik9tkLggs2z +1T1084BquCuO6lIcOwJBALX4xChoMUF9k0IxSQzlz//seQYDkQNsE7y9IgAOXkzp +RaR4pzgPbnKj7atG+2dBnffWfE+1Mcy0INDAO6WxPg0= +-----END RSA PRIVATE KEY-----` + address = ":6514" +) + +var ( + initServiceCertFiles sync.Once + serviceCAFile string + serviceCertFile string + serviceKeyFile string +) + +type testCase5425 struct { + name string + data []byte + wantBestEffort []testutil.Metric + wantStrict []testutil.Metric + werr int // how many errors we expect in the strict mode? +} + +func getTestCasesForRFC5425() []testCase5425 { + testCases := []testCase5425{ + { + name: "1st/avg/ok", + data: []byte(`188 <29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UTC(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta sequence": "14125553", + "meta service": "someservice", + }, + Tags: map[string]string{ + "severity": "5", + "severity_level": "notice", + "facility": "3", + "facility_message": "system daemons", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UTC(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta sequence": "14125553", + "meta service": "someservice", + }, + Tags: map[string]string{ + "severity": "5", + "severity_level": "notice", + "facility": "3", + "facility_message": "system daemons", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/min/ok//2nd/min/ok", + data: []byte("16 <1>2 - - - - - -17 <4>11 - - - - - -"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(11), + }, + Tags: map[string]string{ + "severity": "4", + "severity_level": "warning", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(11), + }, + Tags: map[string]string{ + "severity": "4", + "severity_level": "warning", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/utf8/ok", + data: []byte("23 <1>1 - - - - - - hellø"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "hellø", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "hellø", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/nl/ok", // newline + data: []byte("28 <1>3 - - - - - - hello\nworld"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "hello\nworld", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "hello\nworld", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/uf/ko", // underflow (msglen less than provided octets) + data: []byte("16 <1>2"), + wantStrict: nil, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + werr: 1, + }, + { + name: "1st/min/ok", + data: []byte("16 <1>1 - - - - - -"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/uf/mf", // The first "underflow" message breaks also the second one + data: []byte("16 <1>217 <11>1 - - - - - -"), + wantStrict: nil, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(217), + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + werr: 1, + }, + // { + // name: "1st/of/ko", // overflow (msglen greather then max allowed octets) + // data: []byte(fmt.Sprintf("8193 <%d>%d %s %s %s %s %s 12 %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)), + // want: []testutil.Metric{}, + // }, + { + name: "1st/max/ok", + data: []byte(fmt.Sprintf("8192 <%d>%d %s %s %s %s %s - %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + }, + Tags: map[string]string{ + "severity": "7", + "severity_level": "debug", + "facility": "23", + "facility_message": "local use 7 (local7)", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + }, + Tags: map[string]string{ + "severity": "7", + "severity_level": "debug", + "facility": "23", + "facility_message": "local use 7 (local7)", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + }, + }, + } + + return testCases +} + +func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { + d := &internal.Duration{ + Duration: defaultReadTimeout, + } + s := &Syslog{ + Protocol: "tcp", + Address: address, + now: func() time.Time { + return defaultTime + }, + ReadTimeout: d, + BestEffort: bestEffort, + } + if keepAlive != nil { + s.KeepAlivePeriod = keepAlive + } + if maxConn > 0 { + s.MaxConnections = maxConn + } + + return s +} + +func newTLSSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { + initServiceCertFiles.Do(func() { + scaf, err := ioutil.TempFile("", "serviceCAFile.crt") + if err != nil { + panic(err) + } + defer scaf.Close() + _, err = io.Copy(scaf, bytes.NewReader([]byte(serviceRootPEM))) + serviceCAFile = scaf.Name() + + scf, err := ioutil.TempFile("", "serviceCertFile.crt") + if err != nil { + panic(err) + } + defer scf.Close() + _, err = io.Copy(scf, bytes.NewReader([]byte(serviceCertPEM))) + serviceCertFile = scf.Name() + + skf, err := ioutil.TempFile("", "serviceKeyFile.crt") + if err != nil { + panic(err) + } + defer skf.Close() + _, err = io.Copy(skf, bytes.NewReader([]byte(serviceKeyPEM))) + serviceKeyFile = skf.Name() + }) + + receiver := newTCPSyslogReceiver(keepAlive, maxConn, bestEffort) + receiver.TLSAllowedCACerts = append(receiver.TLSAllowedCACerts, serviceCAFile) + receiver.TLSCert = serviceCertFile + receiver.TLSKey = serviceKeyFile + + return receiver +} + +func getTLSSyslogSender() net.Conn { + cas := x509.NewCertPool() + cas.AppendCertsFromPEM([]byte(serviceRootPEM)) + clientCert, err := tls.X509KeyPair([]byte(clientCertPEM), []byte(clientKeyPEM)) + if err != nil { + panic(err) + } + + config := &tls.Config{ + RootCAs: cas, + Certificates: []tls.Certificate{clientCert}, + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS12, + Renegotiation: tls.RenegotiateNever, + InsecureSkipVerify: false, + ServerName: "localhost", + } + + c, err := tls.Dial("tcp", address, config) + if err != nil { + log.Println(err) + panic(err) + } + + return c +} + +func testStrict(t *testing.T, acc *testutil.Accumulator, tls bool) { + for _, tc := range getTestCasesForRFC5425() { + t.Run(tc.name, func(t *testing.T) { + // Connect + var conn net.Conn + var err error + if tls { + conn = getTLSSyslogSender() + + } else { + conn, err = net.Dial("tcp", address) + defer conn.Close() + } + require.NotNil(t, conn) + require.Nil(t, err) + + // Clear + acc.ClearMetrics() + acc.Errors = make([]error, 0) + // Write + conn.Write(tc.data) + // Wait that the the number of data points is accumulated + // Since the receiver is running concurrently + if tc.wantStrict != nil { + acc.Wait(len(tc.wantStrict)) + } + // Wait the parsing error + acc.WaitError(tc.werr) + // Verify + if len(acc.Errors) != tc.werr { + t.Fatalf("Got unexpected errors. want error = %v, errors = %v\n", tc.werr, acc.Errors) + } + var got []testutil.Metric + for _, metric := range acc.Metrics { + got = append(got, *metric) + } + if !cmp.Equal(tc.wantStrict, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantStrict, got)) + } + }) + } +} + +func testBestEffort(t *testing.T, acc *testutil.Accumulator, tls bool) { + for _, tc := range getTestCasesForRFC5425() { + t.Run(tc.name, func(t *testing.T) { + // Connect + var conn net.Conn + var err error + if tls { + conn = getTLSSyslogSender() + require.NotNil(t, conn) + } else { + conn, err = net.Dial("tcp", address) + require.NoError(t, err) + defer conn.Close() + } + + // Clear + acc.ClearMetrics() + acc.Errors = make([]error, 0) + // Write + conn.Write(tc.data) + // Wait that the the number of data points is accumulated + // Since the receiver is running concurrently + if tc.wantBestEffort != nil { + acc.Wait(len(tc.wantBestEffort)) + } + var got []testutil.Metric + for _, metric := range acc.Metrics { + got = append(got, *metric) + } + if !cmp.Equal(tc.wantBestEffort, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantBestEffort, got)) + } + }) + } +} + +func TestTCPInStrictMode(t *testing.T) { + receiver := newTCPSyslogReceiver(nil, 0, false) + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testStrict(t, acc, false) +} + +func TestTCPInBestEffort(t *testing.T) { + receiver := newTCPSyslogReceiver(nil, 0, true) + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testBestEffort(t, acc, false) +} + +func TestTLSInStrictMode(t *testing.T) { + receiver := newTLSSyslogReceiver(nil, 0, false) + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testStrict(t, acc, true) +} + +func TestTLSInBestEffortOn(t *testing.T) { + receiver := newTLSSyslogReceiver(nil, 0, true) + require.True(t, receiver.BestEffort) + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testBestEffort(t, acc, true) +} + +func TestTLSWithKeepAliveInStrictMode(t *testing.T) { + keepAlivePeriod := &internal.Duration{ + Duration: time.Minute, + } + receiver := newTLSSyslogReceiver(keepAlivePeriod, 0, false) + require.Equal(t, receiver.KeepAlivePeriod, keepAlivePeriod) + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testStrict(t, acc, true) +} + +func TestTLSWithZeroKeepAliveInStrictMode(t *testing.T) { + keepAlivePeriod := &internal.Duration{ + Duration: 0, + } + receiver := newTLSSyslogReceiver(keepAlivePeriod, 0, false) + require.Equal(t, receiver.KeepAlivePeriod, keepAlivePeriod) + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testStrict(t, acc, true) +} From 1bb39078c91ee50ed6b3bac97044168e83fcdeeb Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Sun, 20 May 2018 18:09:00 +0100 Subject: [PATCH 05/16] Update: Testing syslog input plugin listening on UDP --- plugins/inputs/syslog/rfc5426_test.go | 276 ++++++++++++++++++++++++++ 1 file changed, 276 insertions(+) create mode 100644 plugins/inputs/syslog/rfc5426_test.go diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go new file mode 100644 index 0000000000000..b88c56b98c509 --- /dev/null +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -0,0 +1,276 @@ +package syslog + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +type testCase5426 struct { + name string + data []byte + wantBestEffort *testutil.Metric + wantStrict *testutil.Metric + werr bool +} + +func getTestCasesForRFC5426() []testCase5426 { + testCases := []testCase5426{ + { + name: "empty", + data: []byte(""), + werr: true, + }, + { + name: "complete", + data: []byte("<1>1 - - - - - - A"), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "A", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "A", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + { + name: "one/per/packet", + data: []byte("<1>3 - - - - - - A<1>4 - - - - - - B"), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "A<1>4 - - - - - - B", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "A<1>4 - - - - - - B", + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + }, + { + name: "average", + data: []byte(`<29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UTC(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta sequence": "14125553", + "meta service": "someservice", + }, + Tags: map[string]string{ + "severity": "5", + "severity_level": "notice", + "facility": "3", + "facility_message": "system daemons", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UTC(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta sequence": "14125553", + "meta service": "someservice", + }, + Tags: map[string]string{ + "severity": "5", + "severity_level": "notice", + "facility": "3", + "facility_message": "system daemons", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + { + name: "max", + data: []byte(fmt.Sprintf("<%d>%d %s %s %s %s %s - %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + }, + Tags: map[string]string{ + "severity": "7", + "severity_level": "debug", + "facility": "23", + "facility_message": "local use 7 (local7)", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + }, + Tags: map[string]string{ + "severity": "7", + "severity_level": "debug", + "facility": "23", + "facility_message": "local use 7 (local7)", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + }, + { + name: "minimal/incomplete", + data: []byte("<1>2"), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + }, + Tags: map[string]string{ + "severity": "1", + "severity_level": "alert", + "facility": "0", + "facility_message": "kernel messages", + }, + Time: defaultTime, + }, + werr: true, + }, + } + + return testCases +} + +func newUDPSyslogReceiver(bestEffort bool) *Syslog { + return &Syslog{ + Protocol: "udp", + Address: address, + now: func() time.Time { + return defaultTime + }, + BestEffort: bestEffort, + } +} + +func testRFC5426(t *testing.T, acc *testutil.Accumulator, bestEffort bool) { + for _, tc := range getTestCasesForRFC5426() { + t.Run(tc.name, func(t *testing.T) { + // Clear + acc.ClearMetrics() + // Connect + conn, err := net.Dial("udp", address) + defer conn.Close() + require.NotNil(t, conn) + require.Nil(t, err) + // Write + _, e := conn.Write(tc.data) + require.Nil(t, e) + // Waiting ... + if tc.wantStrict == nil && tc.werr || bestEffort && tc.werr { + acc.WaitError(1) + } + if tc.wantBestEffort != nil && bestEffort || tc.wantStrict != nil && !bestEffort { + acc.Wait(1) // RFC5426 mandates a syslog message per UDP packet + } + // Compare + var got *testutil.Metric + var want *testutil.Metric + if len(acc.Metrics) > 0 { + got = acc.Metrics[0] + } + if bestEffort { + want = tc.wantBestEffort + } else { + want = tc.wantStrict + } + if !cmp.Equal(want, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, got)) + } + }) + } +} + +func TestUDPInBestEffortMode(t *testing.T) { + bestEffort := true + receiver := newUDPSyslogReceiver(bestEffort) + require.Equal(t, receiver.Protocol, "udp") + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testRFC5426(t, acc, bestEffort) +} + +func TestUDPInStrictMode(t *testing.T) { + receiver := newUDPSyslogReceiver(false) + require.Equal(t, receiver.Protocol, "udp") + + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + testRFC5426(t, acc, false) +} From e4f70f4d44e05ab0fbe41ac81d276d0bba7c201e Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Mon, 14 May 2018 17:19:00 +0100 Subject: [PATCH 06/16] Deps: Syslog library --- Godeps | 1 + 1 file changed, 1 insertion(+) diff --git a/Godeps b/Godeps index f44100c911cbb..82c56a0e1e1fd 100644 --- a/Godeps +++ b/Godeps @@ -32,6 +32,7 @@ github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed +github.com/influxdata/go-syslog dcd9920f1eea047ffa10928fd2b7fbad6c7abe83 github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec From a805e38fb862189f03aeae736bf8682379440680 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Mon, 21 May 2018 11:17:00 +0100 Subject: [PATCH 07/16] Update: List of input plugins --- plugins/inputs/all/all.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 80db99bfbc13e..45ef5e717a8cb 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -96,6 +96,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/solr" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" + _ "github.com/influxdata/telegraf/plugins/inputs/syslog" _ "github.com/influxdata/telegraf/plugins/inputs/sysstat" _ "github.com/influxdata/telegraf/plugins/inputs/system" _ "github.com/influxdata/telegraf/plugins/inputs/tail" From b2647ed11aa949e91f0ec2235c957f422300aa94 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Wed, 23 May 2018 15:58:41 +0200 Subject: [PATCH 08/16] Update: Refinements on the syslog input plugin --- plugins/inputs/syslog/README.md | 7 +- plugins/inputs/syslog/syslog.go | 99 +++++++++++++--------------- plugins/inputs/syslog/syslog_test.go | 30 +++------ 3 files changed, 59 insertions(+), 77 deletions(-) diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index 5944039601763..aa56122fc074b 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -46,7 +46,8 @@ To configure this plugin as per RFC5426 give it the following configuration: Other available configurations are: - `keep_alive_period`, `max_connections` for stream sockets -- `best_effort` to tell the parser to work until it is able to do and extract partial but valid info +- `read_timeout` +- `best_effort` to tell the parser to work until it is able to do and extract partial but valid info (more [here](https://github.com/influxdata/go-syslog#best-effort-mode)) ### Metrics @@ -70,9 +71,9 @@ The name of fields in _italic_ corresponds to their runtime value. The fields/tags which name is in **bold** will always be present when a valid Syslog message has been received. -### Syslog transport sender +### RSYSLOG integration -The following instructions illustrate how to configure a syslog transport sender as per RFC5425 - ie., using the octect framing technique. +The following instructions illustrate how to configure a syslog transport sender as per RFC5425 - ie., using the octect framing technique - via RSYSLOG. Install `rsyslog`. diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index c6e44c0292a2e..0e0ffb8144408 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -49,42 +49,42 @@ type Syslog struct { } var sampleConfig = ` - ## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514 - ## Address and port to host the syslog receiver. - ## If no server is specified, then localhost is used as the host. - ## If no port is specified, 6514 is used (RFC5425#section-4.1). - server = ":6514" - - ## Protocol (default = tcp) - ## Should be one of the following values: - ## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram. - ## Otherwise forced to the default. - # protocol = "tcp" - - ## TLS Config - # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] - # tls_cert = "/etc/telegraf/cert.pem" - # tls_key = "/etc/telegraf/key.pem" - - ## Period between keep alive probes. - ## 0 disables keep alive probes. - ## Defaults to the OS configuration. - ## Only applies to stream sockets (e.g. TCP). - # keep_alive_period = "5m" - - ## Maximum number of concurrent connections (default = 0). - ## 0 means unlimited. - ## Only applies to stream sockets (e.g. TCP). - # max_connections = 1024 - - ## Read timeout (default = 500ms). - ## 0 means unlimited. - ## Only applies to stream sockets (e.g. TCP). - read_timeout = 500ms - - ## Whether to parse in best effort mode or not (default = false). - ## By default best effort parsing is off. - # best_effort = false + ## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514 + ## Address and port to host the syslog receiver. + ## If no server is specified, then localhost is used as the host. + ## If no port is specified, 6514 is used (RFC5425#section-4.1). + server = ":6514" + + ## Protocol (default = tcp) + ## Should be one of the following values: + ## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram. + ## Otherwise forced to the default. + # protocol = "tcp" + + ## TLS Config + # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Period between keep alive probes. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + ## Only applies to stream sockets (e.g. TCP). + # keep_alive_period = "5m" + + ## Maximum number of concurrent connections (default = 0). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + # max_connections = 1024 + + ## Read timeout (default = 500ms). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + # read_timeout = 500ms + + ## Whether to parse in best effort mode or not (default = false). + ## By default best effort parsing is off. + # best_effort = false ` // SampleConfig returns sample configuration message @@ -94,7 +94,7 @@ func (s *Syslog) SampleConfig() string { // Description returns the plugin description func (s *Syslog) Description() string { - return "Influx syslog receiver as per RFC5425" + return "Accepts syslog messages per RFC5425" } // Gather ... @@ -107,10 +107,6 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error { s.mu.Lock() defer s.mu.Unlock() - // tags := map[string]string{ - // "address": s.Address, - // } - switch s.Protocol { case "tcp", "tcp4", "tcp6", "unix", "unixpacket": s.isTCP = true @@ -132,8 +128,9 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error { } s.Closer = l s.tcpListener = l - if tlsConfig, _ := s.TLSConfig(); tlsConfig != nil { - s.tlsConfig = tlsConfig + s.tlsConfig, err = s.TLSConfig() + if err != nil { + return err } s.wg.Add(1) @@ -167,8 +164,6 @@ func (s *Syslog) Stop() { s.Close() } s.wg.Wait() - - log.Printf("I! Stopped syslog receiver at %s\n", s.Address) } func (s *Syslog) listenPacket(acc telegraf.Accumulator) { @@ -178,7 +173,6 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) { n, _, err := s.udpListener.ReadFrom(b) if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { - log.Println(err) acc.AddError(err) } break @@ -189,9 +183,9 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) { } p := rfc5424.NewParser() - mex, err := p.Parse(b[:n], &s.BestEffort) - if mex != nil { - acc.AddFields("syslog", fields(mex), tags(mex), s.now()) + message, err := p.Parse(b[:n], &s.BestEffort) + if message != nil { + acc.AddFields("syslog", fields(message), tags(message), s.now()) } if err != nil { acc.AddError(err) @@ -208,7 +202,6 @@ func (s *Syslog) listenStream(acc telegraf.Accumulator) { conn, err := s.tcpListener.Accept() if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { - log.Println(err) acc.AddError(err) } break @@ -248,8 +241,10 @@ func (s *Syslog) removeConnection(c net.Conn) { } func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { - defer s.removeConnection(conn) - defer conn.Close() + defer func() { + s.removeConnection(conn) + conn.Close() + }() if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) diff --git a/plugins/inputs/syslog/syslog_test.go b/plugins/inputs/syslog/syslog_test.go index 3756691804565..fa3624af3916c 100644 --- a/plugins/inputs/syslog/syslog_test.go +++ b/plugins/inputs/syslog/syslog_test.go @@ -10,17 +10,14 @@ import ( ) var defaultTime = time.Unix(0, 0) - -var ( - maxP uint8 - maxV uint16 - maxTS string - maxH string - maxA string - maxPID string - maxMID string - message7681 string -) +var maxP = uint8(191) +var maxV = uint16(999) +var maxTS = "2017-12-31T23:59:59.999999+00:00" +var maxH = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc" +var maxA = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdef" +var maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzab" +var maxMID = "abcdefghilmnopqrstuvzabcdefghilm" +var message7681 = getRandomString(7681) func TestListenError(t *testing.T) { receiver := &Syslog{ @@ -54,14 +51,3 @@ func getRandomString(n int) string { return string(b) } - -func init() { - maxP = uint8(191) - maxV = uint16(999) - maxTS = "2017-12-31T23:59:59.999999+00:00" - maxH = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc" - maxA = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdef" - maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzab" - maxMID = "abcdefghilmnopqrstuvzabcdefghilm" - message7681 = getRandomString(7681) -} From 1c27f075c81d4f7be2de159c3e82a7cb8d58ac2e Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Wed, 23 May 2018 21:03:16 +0200 Subject: [PATCH 09/16] Update: Improvements on test suite of syslog input plugin --- plugins/inputs/syslog/README.md | 23 --- plugins/inputs/syslog/rfc5425_test.go | 263 ++++++-------------------- plugins/inputs/syslog/rfc5426_test.go | 37 ++-- plugins/inputs/syslog/syslog.go | 4 +- plugins/inputs/syslog/syslog_test.go | 30 +-- 5 files changed, 77 insertions(+), 280 deletions(-) diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index aa56122fc074b..759538b237e07 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -8,19 +8,6 @@ This plugin listens for syslog messages following RFC5424 format. When received ### Configuration: -#### TCP - -The minimal configuration is the following: - -```toml -[[inputs.syslog]] - address = ":6514" -``` - -This starts this plugins as a syslog receiver over TCP protocol on port 6514. - -#### TLS - To configure it as a TLS syslog receiver as recommended by RFC5425 give it the following configuration: ```toml @@ -31,16 +18,6 @@ To configure it as a TLS syslog receiver as recommended by RFC5425 give it the f tls_key = "/etc/telegraf/key.pem" ``` -#### UDP - -To configure this plugin as per RFC5426 give it the following configuration: - -```toml -[[inputs.syslog]] - protocol = "udp" - address = ":6514" -``` - #### Other configs Other available configurations are: diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index 761f3817492d3..c344a996db501 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -1,15 +1,9 @@ package syslog import ( - "bytes" "crypto/tls" - "crypto/x509" "fmt" - "io" - "io/ioutil" - "log" "net" - "sync" "testing" "time" @@ -20,81 +14,11 @@ import ( ) const ( - serviceRootPEM = `-----BEGIN CERTIFICATE----- -MIIBxzCCATCgAwIBAgIJAJb7HqN2BzWWMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV -BAMMC1RlbGVncmFmIENBMB4XDTE3MTEwNDA0MzEwN1oXDTI3MTEwMjA0MzEwN1ow -FjEUMBIGA1UEAwwLVGVsZWdyYWYgQ0EwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJ -AoGBANbkUkK6JQC3rbLcXhLJTS9SX6uXyFwl7bUfpAN5Hm5EqfvG3PnLrogfTGLr -Tq5CRAu/gbbdcMoL9TLv/aaDVnrpV0FslKhqYmkOgT28bdmA7Qtr539aQpMKCfcW -WCnoMcBD5u5h9MsRqpdq+0Mjlsf1H2hSf07jHk5R1T4l8RMXAgMBAAGjHTAbMAwG -A1UdEwQFMAMBAf8wCwYDVR0PBAQDAgEGMA0GCSqGSIb3DQEBCwUAA4GBANSrwvpU -t8ihIhpHqgJZ34DM92CZZ3ZHmH/KyqlnuGzjjpnVZiXVrLDTOzrA0ziVhmefY29w -roHjENbFm54HW97ogxeURuO8HRHIVh2U0rkyVxOfGZiUdINHqsZdSnDY07bzCtSr -Z/KsfWXM5llD1Ig1FyBHpKjyUvfzr73sjm/4 ------END CERTIFICATE-----` - serviceCertPEM = `-----BEGIN CERTIFICATE----- -MIIBzzCCATigAwIBAgIBATANBgkqhkiG9w0BAQsFADAWMRQwEgYDVQQDDAtUZWxl -Z3JhZiBDQTAeFw0xNzExMDQwNDMxMDdaFw0yNzExMDIwNDMxMDdaMBQxEjAQBgNV -BAMMCWxvY2FsaG9zdDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAsJRss1af -XKrcIjQoAp2kdJIpT2Ya+MRQXJ18b0PP7szh2lisY11kd/HCkd4D4efuIkpszHaN -xwyTOZLOoplxp6fizzgOYjXsJ6SzbO1MQNmq8Ch/+uKiGgFwLX+YxOOsGSDIHNhF -vcBi93cQtCWPBFz6QRQf9yfIAA5KKxUfJcMCAwEAAaMvMC0wCQYDVR0TBAIwADAL -BgNVHQ8EBAMCBSAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDQYJKoZIhvcNAQELBQAD -gYEAiC3WI4y9vfYz53gw7FKnNK7BBdwRc43x7Pd+5J/cclWyUZPdmcj1UNmv/3rj -2qcMmX06UdgPoHppzNAJePvMVk0vjMBUe9MmYlafMz0h4ma/it5iuldXwmejFcdL -6wWQp7gVTileCEmq9sNvfQN1FmT3EWf4IMdO2MNat/1If0g= ------END CERTIFICATE-----` - serviceKeyPEM = `-----BEGIN RSA PRIVATE KEY----- -MIICXQIBAAKBgQCwlGyzVp9cqtwiNCgCnaR0kilPZhr4xFBcnXxvQ8/uzOHaWKxj -XWR38cKR3gPh5+4iSmzMdo3HDJM5ks6imXGnp+LPOA5iNewnpLNs7UxA2arwKH/6 -4qIaAXAtf5jE46wZIMgc2EW9wGL3dxC0JY8EXPpBFB/3J8gADkorFR8lwwIDAQAB -AoGBAJaFHxfMmjHK77U0UnrQWFSKFy64cftmlL4t/Nl3q7L68PdIKULWZIMeEWZ4 -I0UZiFOwr4em83oejQ1ByGSwekEuiWaKUI85IaHfcbt+ogp9hY/XbOEo56OPQUAd -bEZv1JqJOqta9Ug1/E1P9LjEEyZ5F5ubx7813rxAE31qKtKJAkEA1zaMlCWIr+Rj -hGvzv5rlHH3wbOB4kQFXO4nqj3J/ttzR5QiJW24STMDcbNngFlVcDVju56LrNTiD -dPh9qvl7nwJBANILguR4u33OMksEZTYB7nQZSurqXsq6382zH7pTl29ANQTROHaM -PKC8dnDWq8RGTqKuvWblIzzGIKqIMovZo10CQC96T0UXirITFolOL3XjvAuvFO1Q -EAkdXJs77805m0dCK+P1IChVfiAEpBw3bKJArpAbQIlFfdI953JUp5SieU0CQEub -BSSEKMjh/cxu6peEHnb/262vayuCFKkQPu1sxWewLuVrAe36EKCy9dcsDmv5+rgo -Odjdxc9Madm4aKlaT6kCQQCpAgeblDrrxTrNQ+Typzo37PlnQrvI+0EceAUuJ72G -P0a+YZUeHNRqT2pPN9lMTAZGGi3CtcF2XScbLNEBeXge ------END RSA PRIVATE KEY-----` - clientRootPEM = serviceRootPEM - clientCertPEM = `-----BEGIN CERTIFICATE----- -MIIBzjCCATegAwIBAgIBAjANBgkqhkiG9w0BAQsFADAWMRQwEgYDVQQDDAtUZWxl -Z3JhZiBDQTAeFw0xNzExMDQwNDMxMDdaFw0yNzExMDIwNDMxMDdaMBMxETAPBgNV -BAMMCHRlbGVncmFmMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDP2IMqyOqI -sJjwBprrz8WPzmlrpyYikQ4XSCSJB3DSTIO+igqMpBUTj3vLlOzsHfVVot1WRqc6 -3esM4JE92rc6S73xi4g8L/r8cPIHW4hvFJdMti4UkJBWim8ArSbFqnZjcR19G3tG -LUOiXAUG3nWzMzoEsPruvV1dkKRbJVE4MwIDAQABoy8wLTAJBgNVHRMEAjAAMAsG -A1UdDwQEAwIHgDATBgNVHSUEDDAKBggrBgEFBQcDAjANBgkqhkiG9w0BAQsFAAOB -gQCHxMk38XNxL9nPFBYo3JqITJCFswu6/NLHwDBXCuZKl53rUuFWduiO+1OuScKQ -sQ79W0jHsWRKGOUFrF5/Gdnh8AlkVaITVlcmhdAOFCEbeGpeEvLuuK6grckPitxy -bRF5oM4TCLKKAha60Ir41rk2bomZM9+NZu+Bm+csDqCoxQ== ------END CERTIFICATE-----` - clientKeyPEM = `-----BEGIN RSA PRIVATE KEY----- -MIICXAIBAAKBgQDP2IMqyOqIsJjwBprrz8WPzmlrpyYikQ4XSCSJB3DSTIO+igqM -pBUTj3vLlOzsHfVVot1WRqc63esM4JE92rc6S73xi4g8L/r8cPIHW4hvFJdMti4U -kJBWim8ArSbFqnZjcR19G3tGLUOiXAUG3nWzMzoEsPruvV1dkKRbJVE4MwIDAQAB -AoGAFzb/r4+xYoMXEfgq5ZvXXTCY5cVNpR6+jCsqqYODPnn9XRLeCsdo8z5bfWms -7NKLzHzca/6IPzL6Rf3vOxFq1YyIZfYVHH+d63/9blAm3Iajjp1W2yW5aj9BJjTb -nm6F0RfuW/SjrZ9IXxTZhSpCklPmUzVZpzvwV3KGeVTVCEECQQDoavCeOwLuqDpt -0aM9GMFUpOU7kLPDuicSwCDaTae4kN2rS17Zki41YXe8A8+509IEN7mK09Vq9HxY -SX6EmV1FAkEA5O9QcCHEa8P12EmUC8oqD2bjq6o7JjUIRlKinwZTlooMJYZw98gA -FVSngTUvLVCVIvSdjldXPOGgfYiccTZrFwJAfHS3gKOtAEuJbkEyHodhD4h1UB4+ -hPLr9Xh4ny2yQH0ilpV3px5GLEOTMFUCKUoqTiPg8VxaDjn5U/WXED5n2QJAR4J1 -NsFlcGACj+/TvacFYlA6N2nyFeokzoqLX28Ddxdh2erXqJ4hYIhT1ik9tkLggs2z -1T1084BquCuO6lIcOwJBALX4xChoMUF9k0IxSQzlz//seQYDkQNsE7y9IgAOXkzp -RaR4pzgPbnKj7atG+2dBnffWfE+1Mcy0INDAO6WxPg0= ------END RSA PRIVATE KEY-----` address = ":6514" ) var ( - initServiceCertFiles sync.Once - serviceCAFile string - serviceCertFile string - serviceKeyFile string + pki = testutil.NewPKI("../../../testutil/pki") ) type testCase5425 struct { @@ -446,89 +370,42 @@ func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort return s } -func newTLSSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { - initServiceCertFiles.Do(func() { - scaf, err := ioutil.TempFile("", "serviceCAFile.crt") - if err != nil { - panic(err) - } - defer scaf.Close() - _, err = io.Copy(scaf, bytes.NewReader([]byte(serviceRootPEM))) - serviceCAFile = scaf.Name() - - scf, err := ioutil.TempFile("", "serviceCertFile.crt") - if err != nil { - panic(err) - } - defer scf.Close() - _, err = io.Copy(scf, bytes.NewReader([]byte(serviceCertPEM))) - serviceCertFile = scf.Name() - - skf, err := ioutil.TempFile("", "serviceKeyFile.crt") - if err != nil { - panic(err) - } - defer skf.Close() - _, err = io.Copy(skf, bytes.NewReader([]byte(serviceKeyPEM))) - serviceKeyFile = skf.Name() - }) - - receiver := newTCPSyslogReceiver(keepAlive, maxConn, bestEffort) - receiver.TLSAllowedCACerts = append(receiver.TLSAllowedCACerts, serviceCAFile) - receiver.TLSCert = serviceCertFile - receiver.TLSKey = serviceKeyFile - - return receiver -} - -func getTLSSyslogSender() net.Conn { - cas := x509.NewCertPool() - cas.AppendCertsFromPEM([]byte(serviceRootPEM)) - clientCert, err := tls.X509KeyPair([]byte(clientCertPEM), []byte(clientKeyPEM)) - if err != nil { - panic(err) - } - - config := &tls.Config{ - RootCAs: cas, - Certificates: []tls.Certificate{clientCert}, - MinVersion: tls.VersionTLS12, - MaxVersion: tls.VersionTLS12, - Renegotiation: tls.RenegotiateNever, - InsecureSkipVerify: false, - ServerName: "localhost", - } - - c, err := tls.Dial("tcp", address, config) - if err != nil { - log.Println(err) - panic(err) - } - - return c -} - -func testStrict(t *testing.T, acc *testutil.Accumulator, tls bool) { +func testStrictRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Duration) { for _, tc := range getTestCasesForRFC5425() { t.Run(tc.name, func(t *testing.T) { + // Creation of a strict mode receiver + receiver := newTCPSyslogReceiver(keepAlive, 0, false) + require.NotNil(t, receiver) + if wantTLS { + receiver.ServerConfig = *pki.TLSServerConfig() + } + require.Equal(t, receiver.KeepAlivePeriod, keepAlive) + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + // Connect var conn net.Conn var err error - if tls { - conn = getTLSSyslogSender() - + if wantTLS { + config, e := pki.TLSClientConfig().TLSConfig() + require.NoError(t, e) + config.ServerName = "localhost" + conn, err = tls.Dial("tcp", address, config) } else { conn, err = net.Dial("tcp", address) defer conn.Close() } require.NotNil(t, conn) - require.Nil(t, err) + require.NoError(t, err) // Clear acc.ClearMetrics() acc.Errors = make([]error, 0) + // Write conn.Write(tc.data) + // Wait that the the number of data points is accumulated // Since the receiver is running concurrently if tc.wantStrict != nil { @@ -536,6 +413,7 @@ func testStrict(t *testing.T, acc *testutil.Accumulator, tls bool) { } // Wait the parsing error acc.WaitError(tc.werr) + // Verify if len(acc.Errors) != tc.werr { t.Fatalf("Got unexpected errors. want error = %v, errors = %v\n", tc.werr, acc.Errors) @@ -551,31 +429,49 @@ func testStrict(t *testing.T, acc *testutil.Accumulator, tls bool) { } } -func testBestEffort(t *testing.T, acc *testutil.Accumulator, tls bool) { +func testBestEffortRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Duration) { for _, tc := range getTestCasesForRFC5425() { t.Run(tc.name, func(t *testing.T) { + // Creation of a best effort mode receiver + receiver := newTCPSyslogReceiver(keepAlive, 0, true) + require.NotNil(t, receiver) + if wantTLS { + receiver.ServerConfig = *pki.TLSServerConfig() + } + require.Equal(t, receiver.KeepAlivePeriod, keepAlive) + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + // Connect var conn net.Conn var err error - if tls { - conn = getTLSSyslogSender() - require.NotNil(t, conn) + if wantTLS { + config, e := pki.TLSClientConfig().TLSConfig() + require.NoError(t, e) + config.ServerName = "localhost" + conn, err = tls.Dial("tcp", address, config) } else { conn, err = net.Dial("tcp", address) - require.NoError(t, err) defer conn.Close() } + require.NotNil(t, conn) + require.NoError(t, err) // Clear acc.ClearMetrics() acc.Errors = make([]error, 0) + // Write conn.Write(tc.data) + // Wait that the the number of data points is accumulated // Since the receiver is running concurrently if tc.wantBestEffort != nil { acc.Wait(len(tc.wantBestEffort)) } + + // Verify var got []testutil.Metric for _, metric := range acc.Metrics { got = append(got, *metric) @@ -587,71 +483,26 @@ func testBestEffort(t *testing.T, acc *testutil.Accumulator, tls bool) { } } -func TestTCPInStrictMode(t *testing.T) { - receiver := newTCPSyslogReceiver(nil, 0, false) - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testStrict(t, acc, false) +func TestStrict_tcp(t *testing.T) { + testStrictRFC5425(t, false, nil) } -func TestTCPInBestEffort(t *testing.T) { - receiver := newTCPSyslogReceiver(nil, 0, true) - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testBestEffort(t, acc, false) +func TestBestEffort_tcp(t *testing.T) { + testBestEffortRFC5425(t, false, nil) } -func TestTLSInStrictMode(t *testing.T) { - receiver := newTLSSyslogReceiver(nil, 0, false) - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testStrict(t, acc, true) +func TestStrict_tcp_tls(t *testing.T) { + testStrictRFC5425(t, true, nil) } -func TestTLSInBestEffortOn(t *testing.T) { - receiver := newTLSSyslogReceiver(nil, 0, true) - require.True(t, receiver.BestEffort) - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testBestEffort(t, acc, true) +func TestBestEffort_tcp_tls(t *testing.T) { + testBestEffortRFC5425(t, true, nil) } -func TestTLSWithKeepAliveInStrictMode(t *testing.T) { - keepAlivePeriod := &internal.Duration{ - Duration: time.Minute, - } - receiver := newTLSSyslogReceiver(keepAlivePeriod, 0, false) - require.Equal(t, receiver.KeepAlivePeriod, keepAlivePeriod) - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testStrict(t, acc, true) +func TestStrictWithKeepAlive_tcp_tls(t *testing.T) { + testStrictRFC5425(t, true, &internal.Duration{Duration: time.Minute}) } -func TestTLSWithZeroKeepAliveInStrictMode(t *testing.T) { - keepAlivePeriod := &internal.Duration{ - Duration: 0, - } - receiver := newTLSSyslogReceiver(keepAlivePeriod, 0, false) - require.Equal(t, receiver.KeepAlivePeriod, keepAlivePeriod) - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testStrict(t, acc, true) +func TestStrictWithZeroKeepAlive_tcp_tls(t *testing.T) { + testStrictRFC5425(t, true, &internal.Duration{Duration: 0}) } diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index b88c56b98c509..65023e66ffa15 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -214,19 +214,30 @@ func newUDPSyslogReceiver(bestEffort bool) *Syslog { } } -func testRFC5426(t *testing.T, acc *testutil.Accumulator, bestEffort bool) { +func testRFC5426(t *testing.T, bestEffort bool) { for _, tc := range getTestCasesForRFC5426() { t.Run(tc.name, func(t *testing.T) { + // Create receiver + receiver := newUDPSyslogReceiver(bestEffort) + require.Equal(t, receiver.Protocol, "udp") + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + // Clear acc.ClearMetrics() + acc.Errors = make([]error, 0) + // Connect conn, err := net.Dial("udp", address) defer conn.Close() require.NotNil(t, conn) require.Nil(t, err) + // Write _, e := conn.Write(tc.data) require.Nil(t, e) + // Waiting ... if tc.wantStrict == nil && tc.werr || bestEffort && tc.werr { acc.WaitError(1) @@ -234,6 +245,7 @@ func testRFC5426(t *testing.T, acc *testutil.Accumulator, bestEffort bool) { if tc.wantBestEffort != nil && bestEffort || tc.wantStrict != nil && !bestEffort { acc.Wait(1) // RFC5426 mandates a syslog message per UDP packet } + // Compare var got *testutil.Metric var want *testutil.Metric @@ -252,25 +264,10 @@ func testRFC5426(t *testing.T, acc *testutil.Accumulator, bestEffort bool) { } } -func TestUDPInBestEffortMode(t *testing.T) { - bestEffort := true - receiver := newUDPSyslogReceiver(bestEffort) - require.Equal(t, receiver.Protocol, "udp") - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testRFC5426(t, acc, bestEffort) +func TestBestEffort_udp(t *testing.T) { + testRFC5426(t, true) } -func TestUDPInStrictMode(t *testing.T) { - receiver := newUDPSyslogReceiver(false) - require.Equal(t, receiver.Protocol, "udp") - - acc := &testutil.Accumulator{} - require.NoError(t, receiver.Start(acc)) - defer receiver.Stop() - - testRFC5426(t, acc, false) +func TestStrict_udp(t *testing.T) { + testRFC5426(t, false) } diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index 0e0ffb8144408..ec145bc148bc7 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -4,7 +4,6 @@ import ( "crypto/tls" "fmt" "io" - "log" "net" "os" "strconv" @@ -151,7 +150,6 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error { s.Closer = unixCloser{path: s.Address, closer: s.Closer} } - log.Printf("I! Started syslog receiver at %s\n", s.Address) return nil } @@ -169,6 +167,7 @@ func (s *Syslog) Stop() { func (s *Syslog) listenPacket(acc telegraf.Accumulator) { defer s.wg.Done() b := make([]byte, ipMaxPacketSize) + p := rfc5424.NewParser() for { n, _, err := s.udpListener.ReadFrom(b) if err != nil { @@ -182,7 +181,6 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) { s.udpListener.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) } - p := rfc5424.NewParser() message, err := p.Parse(b[:n], &s.BestEffort) if message != nil { acc.AddFields("syslog", fields(message), tags(message), s.now()) diff --git a/plugins/inputs/syslog/syslog_test.go b/plugins/inputs/syslog/syslog_test.go index fa3624af3916c..4d3501bca6a72 100644 --- a/plugins/inputs/syslog/syslog_test.go +++ b/plugins/inputs/syslog/syslog_test.go @@ -1,7 +1,7 @@ package syslog import ( - "math/rand" + "strings" "testing" "time" @@ -17,7 +17,7 @@ var maxH = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcde var maxA = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdef" var maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzab" var maxMID = "abcdefghilmnopqrstuvzabcdefghilm" -var message7681 = getRandomString(7681) +var message7681 = strings.Repeat("l", 7681) func TestListenError(t *testing.T) { receiver := &Syslog{ @@ -25,29 +25,3 @@ func TestListenError(t *testing.T) { } require.Error(t, receiver.Start(&testutil.Accumulator{})) } - -func getRandomString(n int) string { - const ( - letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - letterIdxBits = 6 // 6 bits to represent a letter index - letterIdxMask = 1<= 0; { - if remain == 0 { - cache, remain = src.Int63(), letterIdxMax - } - if idx := int(cache & letterIdxMask); idx < len(letterBytes) { - b[i] = letterBytes[idx] - i-- - } - cache >>= letterIdxBits - remain-- - } - - return string(b) -} From 09f869b2a6f51e0e76b23de36745181c8a101318 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Wed, 23 May 2018 23:58:58 +0200 Subject: [PATCH 10/16] Update: Syslog input handling of severity and facility values --- Godeps | 2 +- plugins/inputs/syslog/README.md | 42 +++++- plugins/inputs/syslog/rfc5425_test.go | 196 +++++++++++++------------- plugins/inputs/syslog/rfc5426_test.go | 126 ++++++++--------- plugins/inputs/syslog/syslog.go | 24 ++-- 5 files changed, 209 insertions(+), 181 deletions(-) diff --git a/Godeps b/Godeps index 82c56a0e1e1fd..70e6e7966c059 100644 --- a/Godeps +++ b/Godeps @@ -32,7 +32,7 @@ github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed -github.com/influxdata/go-syslog dcd9920f1eea047ffa10928fd2b7fbad6c7abe83 +github.com/influxdata/go-syslog 84f3b60009444d298f97454feb1f20cf91d1fa6e github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index 759538b237e07..c24b1bcf3d761 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -8,14 +8,44 @@ This plugin listens for syslog messages following RFC5424 format. When received ### Configuration: -To configure it as a TLS syslog receiver as recommended by RFC5425 give it the following configuration: - ```toml [[inputs.syslog]] - address = ":6514" - tls_cacert = "/etc/telegraf/ca.pem" - tls_cert = "/etc/telegraf/cert.pem" - tls_key = "/etc/telegraf/key.pem" + ## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514 + ## Address and port to host the syslog receiver. + ## If no server is specified, then localhost is used as the host. + ## If no port is specified, 6514 is used (RFC5425#section-4.1). + server = ":6514" + + ## Protocol (default = tcp) + ## Should be one of the following values: + ## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram. + ## Otherwise forced to the default. + # protocol = "tcp" + + ## TLS Config + # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Period between keep alive probes. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + ## Only applies to stream sockets (e.g. TCP). + # keep_alive_period = "5m" + + ## Maximum number of concurrent connections (default = 0). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + # max_connections = 1024 + + ## Read timeout (default = 500ms). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + # read_timeout = 500ms + + ## Whether to parse in best effort mode or not (default = false). + ## By default best effort parsing is off. + # best_effort = false ``` #### Other configs diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index c344a996db501..5e2768d8ad26d 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -46,14 +46,14 @@ func getTestCasesForRFC5425() []testCase5425 { "origin": true, "meta sequence": "14125553", "meta service": "someservice", + "severity_code": 5, + "facility_code": 3, }, Tags: map[string]string{ - "severity": "5", - "severity_level": "notice", - "facility": "3", - "facility_message": "system daemons", - "hostname": "web1", - "appname": "someservice", + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", }, Time: defaultTime, }, @@ -70,14 +70,14 @@ func getTestCasesForRFC5425() []testCase5425 { "origin": true, "meta sequence": "14125553", "meta service": "someservice", + "severity_code": 5, + "facility_code": 3, }, Tags: map[string]string{ - "severity": "5", - "severity_level": "notice", - "facility": "3", - "facility_message": "system daemons", - "hostname": "web1", - "appname": "someservice", + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", }, Time: defaultTime, }, @@ -90,26 +90,26 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(2), + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(11), + "version": uint16(11), + "severity_code": 4, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "4", - "severity_level": "warning", - "facility": "0", - "facility_message": "kernel messages", + "severity": "warning", + "facility": "kern", }, Time: defaultTime, }, @@ -118,26 +118,26 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(2), + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(11), + "version": uint16(11), + "severity_code": 4, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "4", - "severity_level": "warning", - "facility": "0", - "facility_message": "kernel messages", + "severity": "warning", + "facility": "kern", }, Time: defaultTime, }, @@ -150,14 +150,14 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(1), - "message": "hellø", + "version": uint16(1), + "message": "hellø", + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -166,14 +166,14 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(1), - "message": "hellø", + "version": uint16(1), + "message": "hellø", + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -186,14 +186,14 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(3), - "message": "hello\nworld", + "version": uint16(3), + "message": "hello\nworld", + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -202,14 +202,14 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(3), - "message": "hello\nworld", + "version": uint16(3), + "message": "hello\nworld", + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -223,13 +223,13 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(2), + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -243,13 +243,13 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(1), + "version": uint16(1), + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -258,13 +258,13 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(1), + "version": uint16(1), + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -278,13 +278,13 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(217), + "version": uint16(217), + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -303,19 +303,19 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), - "message": message7681, - "procid": maxPID, - "msgid": maxMID, + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "facility_code": 23, + "severity_code": 7, }, Tags: map[string]string{ - "severity": "7", - "severity_level": "debug", - "facility": "23", - "facility_message": "local use 7 (local7)", - "hostname": maxH, - "appname": maxA, + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, }, Time: defaultTime, }, @@ -324,19 +324,19 @@ func getTestCasesForRFC5425() []testCase5425 { testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), - "message": message7681, - "procid": maxPID, - "msgid": maxMID, + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "facility_code": 23, + "severity_code": 7, }, Tags: map[string]string{ - "severity": "7", - "severity_level": "debug", - "facility": "23", - "facility_message": "local use 7 (local7)", - "hostname": maxH, - "appname": maxA, + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, }, Time: defaultTime, }, diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index 65023e66ffa15..f2be36d8d9db4 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -32,28 +32,28 @@ func getTestCasesForRFC5426() []testCase5426 { wantBestEffort: &testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(1), - "message": "A", + "version": uint16(1), + "message": "A", + "facility_code": 0, + "severity_code": 1, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, wantStrict: &testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(1), - "message": "A", + "version": uint16(1), + "message": "A", + "facility_code": 0, + "severity_code": 1, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -64,28 +64,28 @@ func getTestCasesForRFC5426() []testCase5426 { wantBestEffort: &testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(3), - "message": "A<1>4 - - - - - - B", + "version": uint16(3), + "message": "A<1>4 - - - - - - B", + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, wantStrict: &testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(3), - "message": "A<1>4 - - - - - - B", + "version": uint16(3), + "message": "A<1>4 - - - - - - B", + "severity_code": 1, + "facility_code": 0, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, @@ -104,14 +104,14 @@ func getTestCasesForRFC5426() []testCase5426 { "origin": true, "meta sequence": "14125553", "meta service": "someservice", + "severity_code": 5, + "facility_code": 3, }, Tags: map[string]string{ - "severity": "5", - "severity_level": "notice", - "facility": "3", - "facility_message": "system daemons", - "hostname": "web1", - "appname": "someservice", + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", }, Time: defaultTime, }, @@ -126,14 +126,14 @@ func getTestCasesForRFC5426() []testCase5426 { "origin": true, "meta sequence": "14125553", "meta service": "someservice", + "severity_code": 5, + "facility_code": 3, }, Tags: map[string]string{ - "severity": "5", - "severity_level": "notice", - "facility": "3", - "facility_message": "system daemons", - "hostname": "web1", - "appname": "someservice", + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", }, Time: defaultTime, }, @@ -144,38 +144,38 @@ func getTestCasesForRFC5426() []testCase5426 { wantBestEffort: &testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), - "message": message7681, - "procid": maxPID, - "msgid": maxMID, + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "severity_code": 7, + "facility_code": 23, }, Tags: map[string]string{ - "severity": "7", - "severity_level": "debug", - "facility": "23", - "facility_message": "local use 7 (local7)", - "hostname": maxH, - "appname": maxA, + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, }, Time: defaultTime, }, wantStrict: &testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), - "message": message7681, - "procid": maxPID, - "msgid": maxMID, + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UTC(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "severity_code": 7, + "facility_code": 23, }, Tags: map[string]string{ - "severity": "7", - "severity_level": "debug", - "facility": "23", - "facility_message": "local use 7 (local7)", - "hostname": maxH, - "appname": maxA, + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, }, Time: defaultTime, }, @@ -186,13 +186,13 @@ func getTestCasesForRFC5426() []testCase5426 { wantBestEffort: &testutil.Metric{ Measurement: "syslog", Fields: map[string]interface{}{ - "version": uint16(2), + "version": uint16(2), + "facility_code": 0, + "severity_code": 1, }, Tags: map[string]string{ - "severity": "1", - "severity_level": "alert", - "facility": "0", - "facility_message": "kernel messages", + "severity": "alert", + "facility": "kern", }, Time: defaultTime, }, diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index ec145bc148bc7..7d985c191ce8b 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -6,7 +6,6 @@ import ( "io" "net" "os" - "strconv" "strings" "sync" "time" @@ -183,7 +182,7 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) { message, err := p.Parse(b[:n], &s.BestEffort) if message != nil { - acc.AddFields("syslog", fields(message), tags(message), s.now()) + acc.AddFields("syslog", fields(*message), tags(*message), s.now()) } if err != nil { acc.AddError(err) @@ -282,21 +281,17 @@ func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) { acc.AddError(res.MessageError) } if res.Message != nil { - acc.AddFields("syslog", fields(res.Message), tags(res.Message), s.now()) + msg := *res.Message + acc.AddFields("syslog", fields(msg), tags(msg), s.now()) } } -func tags(msg *rfc5424.SyslogMessage) map[string]string { +func tags(msg rfc5424.SyslogMessage) map[string]string { ts := map[string]string{} - if lvl := msg.SeverityLevel(); lvl != nil { - ts["severity"] = strconv.Itoa(int(*msg.Severity())) - ts["severity_level"] = *lvl - } - if f := msg.FacilityMessage(); f != nil { - ts["facility"] = strconv.Itoa(int(*msg.Facility())) - ts["facility_message"] = *f - } + // Not checking assuming a minimally valid message + ts["severity"] = *msg.SeverityShortLevel() + ts["facility"] = *msg.FacilityLevel() if msg.Hostname() != nil { ts["hostname"] = *msg.Hostname() @@ -309,10 +304,13 @@ func tags(msg *rfc5424.SyslogMessage) map[string]string { return ts } -func fields(msg *rfc5424.SyslogMessage) map[string]interface{} { +func fields(msg rfc5424.SyslogMessage) map[string]interface{} { + // Not checking assuming a minimally valid message flds := map[string]interface{}{ "version": msg.Version(), } + flds["severity_code"] = int(*msg.Severity()) + flds["facility_code"] = int(*msg.Facility()) if msg.Timestamp() != nil { flds["timestamp"] = *msg.Timestamp() From dd6c6c05611a2901db8f610de952a68817b114c8 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Thu, 24 May 2018 13:10:14 +0200 Subject: [PATCH 11/16] New: Option to let the user choose how the syslog plugin have to prepend SDPARAMs --- plugins/inputs/syslog/README.md | 19 +++++++++++++------ plugins/inputs/syslog/rfc5425_test.go | 9 +++++---- plugins/inputs/syslog/rfc5426_test.go | 9 +++++---- plugins/inputs/syslog/syslog.go | 18 +++++++++++++----- 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index c24b1bcf3d761..bc443dbcef8a6 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -6,7 +6,7 @@ It can act as a syslog transport receiver over TLS (or TCP) - ie., RFC5425 - or This plugin listens for syslog messages following RFC5424 format. When received it parses them extracting metrics. -### Configuration: +### Configuration ```toml [[inputs.syslog]] @@ -40,12 +40,18 @@ This plugin listens for syslog messages following RFC5424 format. When received ## Read timeout (default = 500ms). ## 0 means unlimited. - ## Only applies to stream sockets (e.g. TCP). # read_timeout = 500ms ## Whether to parse in best effort mode or not (default = false). ## By default best effort parsing is off. # best_effort = false + + ## Character to prepend to SD-PARAMs (default = "_"). + ## A syslog message can contain multiple parameters and multiple identifiers within structured data section. + ## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"] + ## For each combination a field is created. + ## Its name is created concatenating identifier, sdparam_separator, and parameter name. + # sdparam_separator = "_" ``` #### Other configs @@ -55,22 +61,23 @@ Other available configurations are: - `keep_alive_period`, `max_connections` for stream sockets - `read_timeout` - `best_effort` to tell the parser to work until it is able to do and extract partial but valid info (more [here](https://github.com/influxdata/go-syslog#best-effort-mode)) +- `sdparam_separator` to choose how to separate structured data param name from its structured data identifier ### Metrics - syslog - fields - **version** (`uint16`) + - **severity_code** (`int`) + - **facility_code** (`int`) - timestamp (`time.Time`) - procid (`string`) - msgid (`string`) - - _structureddata element id_ (`bool`) - - _structureddata element parameter name_ (`string`) + - *sdid* (`bool`) + - *sdid . sdparam_separator . sdparam_name* (`string`) - tags - **severity** (`string`) - - **severity_level** (`string`) - **facility** (`string`) - - **facility_message** (`string`) - hostname (`string`) - appname (`string`) diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index 5e2768d8ad26d..c5893674c9d2d 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -44,8 +44,8 @@ func getTestCasesForRFC5425() []testCase5425 { "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, "origin": true, - "meta sequence": "14125553", - "meta service": "someservice", + "meta_sequence": "14125553", + "meta_service": "someservice", "severity_code": 5, "facility_code": 3, }, @@ -68,8 +68,8 @@ func getTestCasesForRFC5425() []testCase5425 { "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, "origin": true, - "meta sequence": "14125553", - "meta service": "someservice", + "meta_sequence": "14125553", + "meta_service": "someservice", "severity_code": 5, "facility_code": 3, }, @@ -359,6 +359,7 @@ func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort }, ReadTimeout: d, BestEffort: bestEffort, + Separator: "_", } if keepAlive != nil { s.KeepAlivePeriod = keepAlive diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index f2be36d8d9db4..ce257f13ef308 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -102,8 +102,8 @@ func getTestCasesForRFC5426() []testCase5426 { "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, "origin": true, - "meta sequence": "14125553", - "meta service": "someservice", + "meta_sequence": "14125553", + "meta_service": "someservice", "severity_code": 5, "facility_code": 3, }, @@ -124,8 +124,8 @@ func getTestCasesForRFC5426() []testCase5426 { "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, "origin": true, - "meta sequence": "14125553", - "meta service": "someservice", + "meta_sequence": "14125553", + "meta_service": "someservice", "severity_code": 5, "facility_code": 3, }, @@ -211,6 +211,7 @@ func newUDPSyslogReceiver(bestEffort bool) *Syslog { return defaultTime }, BestEffort: bestEffort, + Separator: "_", } } diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index 7d985c191ce8b..a1da6a4794e3d 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -30,6 +30,7 @@ type Syslog struct { ReadTimeout *internal.Duration MaxConnections int BestEffort bool + Separator string `toml:"sdparam_separator"` now func() time.Time @@ -77,12 +78,18 @@ var sampleConfig = ` ## Read timeout (default = 500ms). ## 0 means unlimited. - ## Only applies to stream sockets (e.g. TCP). # read_timeout = 500ms ## Whether to parse in best effort mode or not (default = false). ## By default best effort parsing is off. # best_effort = false + + ## Character to prepend to SD-PARAMs (default = "_"). + ## A syslog message can contain multiple parameters and multiple identifiers within structured data section. + ## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"] + ## For each combination a field is created. + ## Its name is created concatenating identifier, sdparam_separator, and parameter name. + # sdparam_separator = "_" ` // SampleConfig returns sample configuration message @@ -182,7 +189,7 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) { message, err := p.Parse(b[:n], &s.BestEffort) if message != nil { - acc.AddFields("syslog", fields(*message), tags(*message), s.now()) + acc.AddFields("syslog", fields(*message, s), tags(*message), s.now()) } if err != nil { acc.AddError(err) @@ -282,7 +289,7 @@ func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) { } if res.Message != nil { msg := *res.Message - acc.AddFields("syslog", fields(msg), tags(msg), s.now()) + acc.AddFields("syslog", fields(msg, s), tags(msg), s.now()) } } @@ -304,7 +311,7 @@ func tags(msg rfc5424.SyslogMessage) map[string]string { return ts } -func fields(msg rfc5424.SyslogMessage) map[string]interface{} { +func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} { // Not checking assuming a minimally valid message flds := map[string]interface{}{ "version": msg.Version(), @@ -337,7 +344,7 @@ func fields(msg rfc5424.SyslogMessage) map[string]interface{} { } for name, value := range sdparams { // Using whitespace as separator since it is not allowed by the grammar within SDID - flds[sdid+" "+name] = value + flds[sdid+s.Separator+name] = value } } } @@ -363,6 +370,7 @@ func init() { ReadTimeout: &internal.Duration{ Duration: defaultReadTimeout, }, + Separator: "_", } inputs.Add("syslog", func() telegraf.Input { return receiver }) From 6ca6e91843e2e27bd66100c066683826a3891351 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Thu, 24 May 2018 14:59:38 +0200 Subject: [PATCH 12/16] Update: Syslog address option must contain protocol now --- plugins/inputs/syslog/README.md | 14 ++--- plugins/inputs/syslog/rfc5425_test.go | 7 +-- plugins/inputs/syslog/rfc5426_test.go | 6 +-- plugins/inputs/syslog/syslog.go | 74 ++++++++++++++++++--------- plugins/inputs/syslog/syslog_test.go | 41 +++++++++++++-- 5 files changed, 95 insertions(+), 47 deletions(-) diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index bc443dbcef8a6..4a419e3270a68 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -10,17 +10,11 @@ This plugin listens for syslog messages following RFC5424 format. When received ```toml [[inputs.syslog]] - ## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514 - ## Address and port to host the syslog receiver. - ## If no server is specified, then localhost is used as the host. + ## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514 + ## Protocol, address and port to host the syslog receiver. + ## If no host is specified, then localhost is used. ## If no port is specified, 6514 is used (RFC5425#section-4.1). - server = ":6514" - - ## Protocol (default = tcp) - ## Should be one of the following values: - ## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram. - ## Otherwise forced to the default. - # protocol = "tcp" + server = "tcp://:6514" ## TLS Config # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index c5893674c9d2d..9cea18bafe316 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -13,10 +13,6 @@ import ( "github.com/stretchr/testify/require" ) -const ( - address = ":6514" -) - var ( pki = testutil.NewPKI("../../../testutil/pki") ) @@ -352,8 +348,7 @@ func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort Duration: defaultReadTimeout, } s := &Syslog{ - Protocol: "tcp", - Address: address, + Address: "tcp://" + address, now: func() time.Time { return defaultTime }, diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index ce257f13ef308..39ebcd69b7bdc 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -205,8 +205,7 @@ func getTestCasesForRFC5426() []testCase5426 { func newUDPSyslogReceiver(bestEffort bool) *Syslog { return &Syslog{ - Protocol: "udp", - Address: address, + Address: "udp://" + address, now: func() time.Time { return defaultTime }, @@ -220,7 +219,6 @@ func testRFC5426(t *testing.T, bestEffort bool) { t.Run(tc.name, func(t *testing.T) { // Create receiver receiver := newUDPSyslogReceiver(bestEffort) - require.Equal(t, receiver.Protocol, "udp") acc := &testutil.Accumulator{} require.NoError(t, receiver.Start(acc)) defer receiver.Stop() @@ -231,8 +229,8 @@ func testRFC5426(t *testing.T, bestEffort bool) { // Connect conn, err := net.Dial("udp", address) - defer conn.Close() require.NotNil(t, conn) + defer conn.Close() require.Nil(t, err) // Write diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index a1da6a4794e3d..def241a9920ea 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "net/url" "os" "strings" "sync" @@ -23,9 +24,8 @@ const ipMaxPacketSize = 64 * 1024 // Syslog is a syslog plugin type Syslog struct { - Address string `toml:"server"` - Protocol string tlsConfig.ServerConfig + Address string `toml:"server"` KeepAlivePeriod *internal.Duration ReadTimeout *internal.Duration MaxConnections int @@ -38,7 +38,7 @@ type Syslog struct { wg sync.WaitGroup io.Closer - isTCP bool + isStream bool tcpListener net.Listener tlsConfig *tls.Config connections map[string]net.Conn @@ -48,17 +48,11 @@ type Syslog struct { } var sampleConfig = ` - ## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514 - ## Address and port to host the syslog receiver. - ## If no server is specified, then localhost is used as the host. + ## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514 + ## Protocol, address and port to host the syslog receiver. + ## If no host is specified, then localhost is used. ## If no port is specified, 6514 is used (RFC5425#section-4.1). - server = ":6514" - - ## Protocol (default = tcp) - ## Should be one of the following values: - ## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram. - ## Otherwise forced to the default. - # protocol = "tcp" + server = "tcp://:6514" ## TLS Config # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] @@ -112,22 +106,27 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error { s.mu.Lock() defer s.mu.Unlock() - switch s.Protocol { + scheme, host, err := getAddressParts(s.Address) + if err != nil { + return err + } + s.Address = host + + switch scheme { case "tcp", "tcp4", "tcp6", "unix", "unixpacket": - s.isTCP = true + s.isStream = true case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": - s.isTCP = false + s.isStream = false default: - s.Protocol = "tcp" - s.isTCP = true + return fmt.Errorf("unknown protocol '%s' in '%s'", scheme, s.Address) } - if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" { + if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { os.Remove(s.Address) } - if s.isTCP { - l, err := net.Listen(s.Protocol, s.Address) + if s.isStream { + l, err := net.Listen(scheme, s.Address) if err != nil { return err } @@ -141,7 +140,7 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error { s.wg.Add(1) go s.listenStream(acc) } else { - l, err := net.ListenPacket(s.Protocol, s.Address) + l, err := net.ListenPacket(scheme, s.Address) if err != nil { return err } @@ -152,7 +151,7 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error { go s.listenPacket(acc) } - if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" { + if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { s.Closer = unixCloser{path: s.Address, closer: s.Closer} } @@ -170,6 +169,35 @@ func (s *Syslog) Stop() { s.wg.Wait() } +// getAddressParts returns the address scheme and host +// it also sets defaults for them when missing +// when the input address does not specify the protocol it returns an error +func getAddressParts(a string) (string, string, error) { + parts := strings.SplitN(a, "://", 2) + if len(parts) != 2 { + return "", "", fmt.Errorf("missing protocol within address '%s'", a) + } + + u, _ := url.Parse(a) + switch u.Scheme { + case "unix", "unixpacket", "unixgram": + return parts[0], parts[1], nil + } + + var host string + if u.Hostname() != "" { + host = u.Hostname() + } + host += ":" + if u.Port() == "" { + host += "6514" + } else { + host += u.Port() + } + + return u.Scheme, host, nil +} + func (s *Syslog) listenPacket(acc telegraf.Accumulator) { defer s.wg.Done() b := make([]byte, ipMaxPacketSize) diff --git a/plugins/inputs/syslog/syslog_test.go b/plugins/inputs/syslog/syslog_test.go index 4d3501bca6a72..8fa0d9c943dcf 100644 --- a/plugins/inputs/syslog/syslog_test.go +++ b/plugins/inputs/syslog/syslog_test.go @@ -9,6 +9,10 @@ import ( "github.com/stretchr/testify/require" ) +const ( + address = ":6514" +) + var defaultTime = time.Unix(0, 0) var maxP = uint8(191) var maxV = uint16(999) @@ -19,9 +23,38 @@ var maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc var maxMID = "abcdefghilmnopqrstuvzabcdefghilm" var message7681 = strings.Repeat("l", 7681) -func TestListenError(t *testing.T) { - receiver := &Syslog{ - Address: "wrong address", +func TestAddress(t *testing.T) { + var err error + var rec *Syslog + + rec = &Syslog{ + Address: "localhost:6514", + } + err = rec.Start(&testutil.Accumulator{}) + require.EqualError(t, err, "missing protocol within address 'localhost:6514'") + require.Error(t, err) + + rec = &Syslog{ + Address: "unsupported://example.com:6514", + } + err = rec.Start(&testutil.Accumulator{}) + require.EqualError(t, err, "unknown protocol 'unsupported' in 'example.com:6514'") + require.Error(t, err) + + rec = &Syslog{ + Address: "unixgram:///tmp/telegraf.sock", + } + err = rec.Start(&testutil.Accumulator{}) + require.NoError(t, err) + require.Equal(t, "/tmp/telegraf.sock", rec.Address) + rec.Stop() + + // Default port is 6514 + rec = &Syslog{ + Address: "tcp://localhost", } - require.Error(t, receiver.Start(&testutil.Accumulator{})) + err = rec.Start(&testutil.Accumulator{}) + require.NoError(t, err) + require.Equal(t, "localhost:6514", rec.Address) + rec.Stop() } From 8da1f80b586c3d7660dc0d4a3fee310e82b1221d Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Thu, 24 May 2018 20:52:26 +0200 Subject: [PATCH 13/16] New: Increment reception time when equal to the last one for syslog input plugin --- plugins/inputs/syslog/rfc5425_test.go | 4 +- plugins/inputs/syslog/rfc5426_test.go | 113 ++++++++++++++++++++++++++ plugins/inputs/syslog/syslog.go | 22 ++++- 3 files changed, 133 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index 9cea18bafe316..4c8d3dfd49734 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -107,7 +107,7 @@ func getTestCasesForRFC5425() []testCase5425 { "severity": "warning", "facility": "kern", }, - Time: defaultTime, + Time: defaultTime.Add(time.Nanosecond), }, }, wantBestEffort: []testutil.Metric{ @@ -135,7 +135,7 @@ func getTestCasesForRFC5425() []testCase5425 { "severity": "warning", "facility": "kern", }, - Time: defaultTime, + Time: defaultTime.Add(time.Nanosecond), }, }, }, diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index 39ebcd69b7bdc..f99bc1561014b 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -270,3 +270,116 @@ func TestBestEffort_udp(t *testing.T) { func TestStrict_udp(t *testing.T) { testRFC5426(t, false) } + +func TestTimeIncrement_udp(t *testing.T) { + i := 0 + getNow := func() time.Time { + if i%2 == 0 { + return time.Unix(1, 0) + } + return time.Unix(1, 1) + } + + // Create receiver + receiver := &Syslog{ + Address: "udp://" + address, + now: getNow, + BestEffort: false, + Separator: "_", + } + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + // Connect + conn, err := net.Dial("udp", address) + require.NotNil(t, conn) + defer conn.Close() + require.Nil(t, err) + + // Write + _, e := conn.Write([]byte("<1>1 - - - - - -")) + require.Nil(t, e) + + // Wait + acc.Wait(1) + + want := &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: getNow(), + } + + if !cmp.Equal(want, acc.Metrics[0]) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0])) + } + + // New one with different time + i++ + + // Clear + acc.ClearMetrics() + + // Write + _, e = conn.Write([]byte("<1>1 - - - - - -")) + require.Nil(t, e) + + // Wait + acc.Wait(1) + + want = &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: getNow(), + } + + if !cmp.Equal(want, acc.Metrics[0]) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0])) + } + + // New one with same time as previous one + + // Clear + acc.ClearMetrics() + + // Write + _, e = conn.Write([]byte("<1>1 - - - - - -")) + require.Nil(t, e) + + // Wait + acc.Wait(1) + + want = &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: getNow().Add(time.Nanosecond), + } + + if !cmp.Equal(want, acc.Metrics[0]) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0])) + } +} diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index def241a9920ea..f780cedac035a 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -32,7 +32,8 @@ type Syslog struct { BestEffort bool Separator string `toml:"sdparam_separator"` - now func() time.Time + now func() time.Time + lastTime time.Time mu sync.Mutex wg sync.WaitGroup @@ -217,7 +218,7 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) { message, err := p.Parse(b[:n], &s.BestEffort) if message != nil { - acc.AddFields("syslog", fields(*message, s), tags(*message), s.now()) + acc.AddFields("syslog", fields(*message, s), tags(*message), s.time()) } if err != nil { acc.AddError(err) @@ -317,7 +318,7 @@ func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) { } if res.Message != nil { msg := *res.Message - acc.AddFields("syslog", fields(msg, s), tags(msg), s.now()) + acc.AddFields("syslog", fields(msg, s), tags(msg), s.time()) } } @@ -391,10 +392,23 @@ func (uc unixCloser) Close() error { return err } +func (s *Syslog) time() time.Time { + t := s.now() + if t == s.lastTime { + t = t.Add(time.Nanosecond) + } + s.lastTime = t + return t +} + +func getNanoNow() time.Time { + return time.Unix(0, time.Now().UnixNano()) +} + func init() { receiver := &Syslog{ Address: ":6514", - now: time.Now, + now: getNanoNow, ReadTimeout: &internal.Duration{ Duration: defaultReadTimeout, }, From e0e9d40d8f28c8cd54cf3234a9ae251bee3d62a6 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Fri, 25 May 2018 12:29:25 +0200 Subject: [PATCH 14/16] Update: Syslog input plugin stores timestamp as integers in nanosec precision --- plugins/inputs/syslog/README.md | 2 +- plugins/inputs/syslog/rfc5425_test.go | 8 ++++---- plugins/inputs/syslog/rfc5426_test.go | 8 ++++---- plugins/inputs/syslog/syslog.go | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index 4a419e3270a68..010dab60d19ab 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -64,7 +64,7 @@ Other available configurations are: - **version** (`uint16`) - **severity_code** (`int`) - **facility_code** (`int`) - - timestamp (`time.Time`) + - timestamp (`int`) - procid (`string`) - msgid (`string`) - *sdid* (`bool`) diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index 4c8d3dfd49734..4dca3ceaddba9 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -35,7 +35,7 @@ func getTestCasesForRFC5425() []testCase5425 { Measurement: "syslog", Fields: map[string]interface{}{ "version": uint16(1), - "timestamp": time.Unix(1456029177, 0).UTC(), + "timestamp": time.Unix(1456029177, 0).UnixNano(), "procid": "2341", "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, @@ -59,7 +59,7 @@ func getTestCasesForRFC5425() []testCase5425 { Measurement: "syslog", Fields: map[string]interface{}{ "version": uint16(1), - "timestamp": time.Unix(1456029177, 0).UTC(), + "timestamp": time.Unix(1456029177, 0).UnixNano(), "procid": "2341", "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, @@ -300,7 +300,7 @@ func getTestCasesForRFC5425() []testCase5425 { Measurement: "syslog", Fields: map[string]interface{}{ "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), "message": message7681, "procid": maxPID, "msgid": maxMID, @@ -321,7 +321,7 @@ func getTestCasesForRFC5425() []testCase5425 { Measurement: "syslog", Fields: map[string]interface{}{ "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), "message": message7681, "procid": maxPID, "msgid": maxMID, diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index f99bc1561014b..355979cff9bce 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -97,7 +97,7 @@ func getTestCasesForRFC5426() []testCase5426 { Measurement: "syslog", Fields: map[string]interface{}{ "version": uint16(1), - "timestamp": time.Unix(1456029177, 0).UTC(), + "timestamp": time.Unix(1456029177, 0).UnixNano(), "procid": "2341", "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, @@ -119,7 +119,7 @@ func getTestCasesForRFC5426() []testCase5426 { Measurement: "syslog", Fields: map[string]interface{}{ "version": uint16(1), - "timestamp": time.Unix(1456029177, 0).UTC(), + "timestamp": time.Unix(1456029177, 0).UnixNano(), "procid": "2341", "msgid": "2", "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, @@ -145,7 +145,7 @@ func getTestCasesForRFC5426() []testCase5426 { Measurement: "syslog", Fields: map[string]interface{}{ "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), "message": message7681, "procid": maxPID, "msgid": maxMID, @@ -164,7 +164,7 @@ func getTestCasesForRFC5426() []testCase5426 { Measurement: "syslog", Fields: map[string]interface{}{ "version": maxV, - "timestamp": time.Unix(1514764799, 999999000).UTC(), + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), "message": message7681, "procid": maxPID, "msgid": maxMID, diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index f780cedac035a..21f6a770fa0b1 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -349,7 +349,7 @@ func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} { flds["facility_code"] = int(*msg.Facility()) if msg.Timestamp() != nil { - flds["timestamp"] = *msg.Timestamp() + flds["timestamp"] = (*msg.Timestamp()).UnixNano() } if msg.ProcID() != nil { From 716da7614c187b8b82b958f269ef25cb91327585 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Fri, 25 May 2018 12:49:14 +0200 Subject: [PATCH 15/16] New: Testing sockets for syslog input plugin --- plugins/inputs/syslog/rfc5425_test.go | 50 ++++++++++++++++++--------- plugins/inputs/syslog/rfc5426_test.go | 27 +++++++++++---- 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index 4dca3ceaddba9..2d724efbec0f0 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -343,12 +343,12 @@ func getTestCasesForRFC5425() []testCase5425 { return testCases } -func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { +func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { d := &internal.Duration{ - Duration: defaultReadTimeout, + Duration: 50 * time.Millisecond, } s := &Syslog{ - Address: "tcp://" + address, + Address: address, now: func() time.Time { return defaultTime }, @@ -366,11 +366,11 @@ func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort return s } -func testStrictRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Duration) { +func testStrictRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { for _, tc := range getTestCasesForRFC5425() { t.Run(tc.name, func(t *testing.T) { // Creation of a strict mode receiver - receiver := newTCPSyslogReceiver(keepAlive, 0, false) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() @@ -387,9 +387,9 @@ func testStrictRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Duration) config, e := pki.TLSClientConfig().TLSConfig() require.NoError(t, e) config.ServerName = "localhost" - conn, err = tls.Dial("tcp", address, config) + conn, err = tls.Dial(protocol, address, config) } else { - conn, err = net.Dial("tcp", address) + conn, err = net.Dial(protocol, address) defer conn.Close() } require.NotNil(t, conn) @@ -425,11 +425,11 @@ func testStrictRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Duration) } } -func testBestEffortRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Duration) { +func testBestEffortRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { for _, tc := range getTestCasesForRFC5425() { t.Run(tc.name, func(t *testing.T) { // Creation of a best effort mode receiver - receiver := newTCPSyslogReceiver(keepAlive, 0, true) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() @@ -446,9 +446,9 @@ func testBestEffortRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Durat config, e := pki.TLSClientConfig().TLSConfig() require.NoError(t, e) config.ServerName = "localhost" - conn, err = tls.Dial("tcp", address, config) + conn, err = tls.Dial(protocol, address, config) } else { - conn, err = net.Dial("tcp", address) + conn, err = net.Dial(protocol, address) defer conn.Close() } require.NotNil(t, conn) @@ -480,25 +480,41 @@ func testBestEffortRFC5425(t *testing.T, wantTLS bool, keepAlive *internal.Durat } func TestStrict_tcp(t *testing.T) { - testStrictRFC5425(t, false, nil) + testStrictRFC5425(t, "tcp", address, false, nil) } func TestBestEffort_tcp(t *testing.T) { - testBestEffortRFC5425(t, false, nil) + testBestEffortRFC5425(t, "tcp", address, false, nil) } func TestStrict_tcp_tls(t *testing.T) { - testStrictRFC5425(t, true, nil) + testStrictRFC5425(t, "tcp", address, true, nil) } func TestBestEffort_tcp_tls(t *testing.T) { - testBestEffortRFC5425(t, true, nil) + testBestEffortRFC5425(t, "tcp", address, true, nil) } func TestStrictWithKeepAlive_tcp_tls(t *testing.T) { - testStrictRFC5425(t, true, &internal.Duration{Duration: time.Minute}) + testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: time.Minute}) } func TestStrictWithZeroKeepAlive_tcp_tls(t *testing.T) { - testStrictRFC5425(t, true, &internal.Duration{Duration: 0}) + testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: 0}) +} + +func TestStrict_unix(t *testing.T) { + testStrictRFC5425(t, "unix", "/tmp/telegraf_test.sock", false, nil) +} + +func TestBestEffort_unix(t *testing.T) { + testBestEffortRFC5425(t, "unix", "/tmp/telegraf_test.sock", false, nil) +} + +func TestStrict_unix_tls(t *testing.T) { + testStrictRFC5425(t, "unix", "/tmp/telegraf_test.sock", true, nil) +} + +func TestBestEffort_unix_tls(t *testing.T) { + testBestEffortRFC5425(t, "unix", "/tmp/telegraf_test.sock", true, nil) } diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index 355979cff9bce..af531c6d562ff 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -3,6 +3,7 @@ package syslog import ( "fmt" "net" + "os" "testing" "time" @@ -203,9 +204,9 @@ func getTestCasesForRFC5426() []testCase5426 { return testCases } -func newUDPSyslogReceiver(bestEffort bool) *Syslog { +func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog { return &Syslog{ - Address: "udp://" + address, + Address: address, now: func() time.Time { return defaultTime }, @@ -214,11 +215,11 @@ func newUDPSyslogReceiver(bestEffort bool) *Syslog { } } -func testRFC5426(t *testing.T, bestEffort bool) { +func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) { for _, tc := range getTestCasesForRFC5426() { t.Run(tc.name, func(t *testing.T) { // Create receiver - receiver := newUDPSyslogReceiver(bestEffort) + receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort) acc := &testutil.Accumulator{} require.NoError(t, receiver.Start(acc)) defer receiver.Stop() @@ -228,7 +229,7 @@ func testRFC5426(t *testing.T, bestEffort bool) { acc.Errors = make([]error, 0) // Connect - conn, err := net.Dial("udp", address) + conn, err := net.Dial(protocol, address) require.NotNil(t, conn) defer conn.Close() require.Nil(t, err) @@ -264,11 +265,23 @@ func testRFC5426(t *testing.T, bestEffort bool) { } func TestBestEffort_udp(t *testing.T) { - testRFC5426(t, true) + testRFC5426(t, "udp", address, true) } func TestStrict_udp(t *testing.T) { - testRFC5426(t, false) + testRFC5426(t, "udp", address, false) +} + +func TestBestEffort_unixgram(t *testing.T) { + sockname := "/tmp/telegraf_test.sock" + os.Create(sockname) + testRFC5426(t, "unixgram", sockname, true) +} + +func TestStrict_unixgram(t *testing.T) { + sockname := "/tmp/telegraf_test.sock" + os.Create(sockname) + testRFC5426(t, "unixgram", sockname, false) } func TestTimeIncrement_udp(t *testing.T) { From 625d18be7eaf4143286b83843172728534dc9801 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Fri, 25 May 2018 17:15:03 +0200 Subject: [PATCH 16/16] Update: Test avoiding race conditions for RFC5426 (syslog input plugin) --- plugins/inputs/syslog/rfc5425_test.go | 2 +- plugins/inputs/syslog/rfc5426_test.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go index 2d724efbec0f0..cac2bf7e176f0 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -345,7 +345,7 @@ func getTestCasesForRFC5425() []testCase5425 { func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { d := &internal.Duration{ - Duration: 50 * time.Millisecond, + Duration: defaultReadTimeout, } s := &Syslog{ Address: address, diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index af531c6d562ff..becd1c4b5e6b5 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "os" + "sync/atomic" "testing" "time" @@ -285,9 +286,10 @@ func TestStrict_unixgram(t *testing.T) { } func TestTimeIncrement_udp(t *testing.T) { - i := 0 + var i int64 + atomic.StoreInt64(&i, 0) getNow := func() time.Time { - if i%2 == 0 { + if atomic.LoadInt64(&i)%2 == 0 { return time.Unix(1, 0) } return time.Unix(1, 1) @@ -336,7 +338,7 @@ func TestTimeIncrement_udp(t *testing.T) { } // New one with different time - i++ + atomic.StoreInt64(&i, atomic.LoadInt64(&i)+1) // Clear acc.ClearMetrics()