From 32edb4883e19861cc438c20a4597133ead4160b3 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Fri, 25 May 2018 20:40:12 +0200 Subject: [PATCH] Add syslog input plugin (#4181) --- Godeps | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/syslog/README.md | 119 ++++++ plugins/inputs/syslog/rfc5425_test.go | 520 ++++++++++++++++++++++++++ plugins/inputs/syslog/rfc5426_test.go | 400 ++++++++++++++++++++ plugins/inputs/syslog/syslog.go | 419 +++++++++++++++++++++ plugins/inputs/syslog/syslog_test.go | 60 +++ 7 files changed, 1520 insertions(+) create mode 100644 plugins/inputs/syslog/README.md create mode 100644 plugins/inputs/syslog/rfc5425_test.go create mode 100644 plugins/inputs/syslog/rfc5426_test.go create mode 100644 plugins/inputs/syslog/syslog.go create mode 100644 plugins/inputs/syslog/syslog_test.go diff --git a/Godeps b/Godeps index c33d4114e35e4..20e3428160fea 100644 --- a/Godeps +++ b/Godeps @@ -32,6 +32,7 @@ github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed +github.com/influxdata/go-syslog 84f3b60009444d298f97454feb1f20cf91d1fa6e github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 239cf6e11103d..dd9a7b774884c 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -97,6 +97,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/solr" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" + _ "github.com/influxdata/telegraf/plugins/inputs/syslog" _ "github.com/influxdata/telegraf/plugins/inputs/sysstat" _ "github.com/influxdata/telegraf/plugins/inputs/system" _ "github.com/influxdata/telegraf/plugins/inputs/tail" diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md new file mode 100644 index 0000000000000..010dab60d19ab --- /dev/null +++ b/plugins/inputs/syslog/README.md @@ -0,0 +1,119 @@ +# syslog input plugin + +Collects syslog messages as per RFC5425 or RFC5426. + +It can act as a syslog transport receiver over TLS (or TCP) - ie., RFC5425 - or over UDP - ie., RFC5426. + +This plugin listens for syslog messages following RFC5424 format. When received it parses them extracting metrics. + +### Configuration + +```toml +[[inputs.syslog]] + ## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514 + ## Protocol, address and port to host the syslog receiver. + ## If no host is specified, then localhost is used. + ## If no port is specified, 6514 is used (RFC5425#section-4.1). + server = "tcp://:6514" + + ## TLS Config + # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Period between keep alive probes. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + ## Only applies to stream sockets (e.g. TCP). + # keep_alive_period = "5m" + + ## Maximum number of concurrent connections (default = 0). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + # max_connections = 1024 + + ## Read timeout (default = 500ms). + ## 0 means unlimited. + # read_timeout = 500ms + + ## Whether to parse in best effort mode or not (default = false). + ## By default best effort parsing is off. + # best_effort = false + + ## Character to prepend to SD-PARAMs (default = "_"). + ## A syslog message can contain multiple parameters and multiple identifiers within structured data section. + ## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"] + ## For each combination a field is created. + ## Its name is created concatenating identifier, sdparam_separator, and parameter name. + # sdparam_separator = "_" +``` + +#### Other configs + +Other available configurations are: + +- `keep_alive_period`, `max_connections` for stream sockets +- `read_timeout` +- `best_effort` to tell the parser to work until it is able to do and extract partial but valid info (more [here](https://github.com/influxdata/go-syslog#best-effort-mode)) +- `sdparam_separator` to choose how to separate structured data param name from its structured data identifier + +### Metrics + +- syslog + - fields + - **version** (`uint16`) + - **severity_code** (`int`) + - **facility_code** (`int`) + - timestamp (`int`) + - procid (`string`) + - msgid (`string`) + - *sdid* (`bool`) + - *sdid . sdparam_separator . sdparam_name* (`string`) + - tags + - **severity** (`string`) + - **facility** (`string`) + - hostname (`string`) + - appname (`string`) + +The name of fields in _italic_ corresponds to their runtime value. + +The fields/tags which name is in **bold** will always be present when a valid Syslog message has been received. + +### RSYSLOG integration + +The following instructions illustrate how to configure a syslog transport sender as per RFC5425 - ie., using the octect framing technique - via RSYSLOG. + +Install `rsyslog`. + +Give it a configuration - ie., `/etc/rsyslog.conf`. + +``` +$ModLoad imuxsock # provides support for local system logging +$ModLoad imklog # provides kernel logging support +$ModLoad immark # provides heart-beat logs +$FileOwner root +$FileGroup root +$FileCreateMode 0640 +$DirCreateMode 0755 +$Umask 0022 +$WorkDirectory /var/spool/rsyslog # default location for work (spool) files +$ActionQueueType LinkedList # use asynchronous processing +$ActionQueueFileName srvrfwd # set file name, also enables disk mode +$ActionResumeRetryCount -1 # infinite retries on insert failure +$ActionQueueSaveOnShutdown on # save in-memory data if rsyslog shuts down +$IncludeConfig /etc/rsyslog.d/*.conf +``` + +Specify you want the octet framing technique enabled and the format of each syslog message to follow the RFC5424. + +Create a file - eg., `/etc/rsyslog.d/50-default.conf` - containing: + +``` +*.* @@(o)127.0.0.1:6514;RSYSLOG_SyslogProtocol23Format +``` + +To complete the TLS setup please refer to [rsyslog docs](https://www.rsyslog.com/doc/v8-stable/tutorials/tls.html). + +Notice that this configuration tells `rsyslog` to broadcast messages to `127.0.0.1>6514`. + +So you have to configure this plugin accordingly. \ No newline at end of file diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/rfc5425_test.go new file mode 100644 index 0000000000000..cac2bf7e176f0 --- /dev/null +++ b/plugins/inputs/syslog/rfc5425_test.go @@ -0,0 +1,520 @@ +package syslog + +import ( + "crypto/tls" + "fmt" + "net" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var ( + pki = testutil.NewPKI("../../../testutil/pki") +) + +type testCase5425 struct { + name string + data []byte + wantBestEffort []testutil.Metric + wantStrict []testutil.Metric + werr int // how many errors we expect in the strict mode? +} + +func getTestCasesForRFC5425() []testCase5425 { + testCases := []testCase5425{ + { + name: "1st/avg/ok", + data: []byte(`188 <29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UnixNano(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta_sequence": "14125553", + "meta_service": "someservice", + "severity_code": 5, + "facility_code": 3, + }, + Tags: map[string]string{ + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UnixNano(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta_sequence": "14125553", + "meta_service": "someservice", + "severity_code": 5, + "facility_code": 3, + }, + Tags: map[string]string{ + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/min/ok//2nd/min/ok", + data: []byte("16 <1>2 - - - - - -17 <4>11 - - - - - -"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(11), + "severity_code": 4, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "warning", + "facility": "kern", + }, + Time: defaultTime.Add(time.Nanosecond), + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(11), + "severity_code": 4, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "warning", + "facility": "kern", + }, + Time: defaultTime.Add(time.Nanosecond), + }, + }, + }, + { + name: "1st/utf8/ok", + data: []byte("23 <1>1 - - - - - - hellø"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "hellø", + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "hellø", + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/nl/ok", // newline + data: []byte("28 <1>3 - - - - - - hello\nworld"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "hello\nworld", + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "hello\nworld", + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/uf/ko", // underflow (msglen less than provided octets) + data: []byte("16 <1>2"), + wantStrict: nil, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + werr: 1, + }, + { + name: "1st/min/ok", + data: []byte("16 <1>1 - - - - - -"), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + }, + { + name: "1st/uf/mf", // The first "underflow" message breaks also the second one + data: []byte("16 <1>217 <11>1 - - - - - -"), + wantStrict: nil, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(217), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + werr: 1, + }, + // { + // name: "1st/of/ko", // overflow (msglen greather then max allowed octets) + // data: []byte(fmt.Sprintf("8193 <%d>%d %s %s %s %s %s 12 %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)), + // want: []testutil.Metric{}, + // }, + { + name: "1st/max/ok", + data: []byte(fmt.Sprintf("8192 <%d>%d %s %s %s %s %s - %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)), + wantStrict: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "facility_code": 23, + "severity_code": 7, + }, + Tags: map[string]string{ + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "facility_code": 23, + "severity_code": 7, + }, + Tags: map[string]string{ + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + }, + }, + } + + return testCases +} + +func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { + d := &internal.Duration{ + Duration: defaultReadTimeout, + } + s := &Syslog{ + Address: address, + now: func() time.Time { + return defaultTime + }, + ReadTimeout: d, + BestEffort: bestEffort, + Separator: "_", + } + if keepAlive != nil { + s.KeepAlivePeriod = keepAlive + } + if maxConn > 0 { + s.MaxConnections = maxConn + } + + return s +} + +func testStrictRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { + for _, tc := range getTestCasesForRFC5425() { + t.Run(tc.name, func(t *testing.T) { + // Creation of a strict mode receiver + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false) + require.NotNil(t, receiver) + if wantTLS { + receiver.ServerConfig = *pki.TLSServerConfig() + } + require.Equal(t, receiver.KeepAlivePeriod, keepAlive) + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + // Connect + var conn net.Conn + var err error + if wantTLS { + config, e := pki.TLSClientConfig().TLSConfig() + require.NoError(t, e) + config.ServerName = "localhost" + conn, err = tls.Dial(protocol, address, config) + } else { + conn, err = net.Dial(protocol, address) + defer conn.Close() + } + require.NotNil(t, conn) + require.NoError(t, err) + + // Clear + acc.ClearMetrics() + acc.Errors = make([]error, 0) + + // Write + conn.Write(tc.data) + + // Wait that the the number of data points is accumulated + // Since the receiver is running concurrently + if tc.wantStrict != nil { + acc.Wait(len(tc.wantStrict)) + } + // Wait the parsing error + acc.WaitError(tc.werr) + + // Verify + if len(acc.Errors) != tc.werr { + t.Fatalf("Got unexpected errors. want error = %v, errors = %v\n", tc.werr, acc.Errors) + } + var got []testutil.Metric + for _, metric := range acc.Metrics { + got = append(got, *metric) + } + if !cmp.Equal(tc.wantStrict, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantStrict, got)) + } + }) + } +} + +func testBestEffortRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { + for _, tc := range getTestCasesForRFC5425() { + t.Run(tc.name, func(t *testing.T) { + // Creation of a best effort mode receiver + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true) + require.NotNil(t, receiver) + if wantTLS { + receiver.ServerConfig = *pki.TLSServerConfig() + } + require.Equal(t, receiver.KeepAlivePeriod, keepAlive) + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + // Connect + var conn net.Conn + var err error + if wantTLS { + config, e := pki.TLSClientConfig().TLSConfig() + require.NoError(t, e) + config.ServerName = "localhost" + conn, err = tls.Dial(protocol, address, config) + } else { + conn, err = net.Dial(protocol, address) + defer conn.Close() + } + require.NotNil(t, conn) + require.NoError(t, err) + + // Clear + acc.ClearMetrics() + acc.Errors = make([]error, 0) + + // Write + conn.Write(tc.data) + + // Wait that the the number of data points is accumulated + // Since the receiver is running concurrently + if tc.wantBestEffort != nil { + acc.Wait(len(tc.wantBestEffort)) + } + + // Verify + var got []testutil.Metric + for _, metric := range acc.Metrics { + got = append(got, *metric) + } + if !cmp.Equal(tc.wantBestEffort, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantBestEffort, got)) + } + }) + } +} + +func TestStrict_tcp(t *testing.T) { + testStrictRFC5425(t, "tcp", address, false, nil) +} + +func TestBestEffort_tcp(t *testing.T) { + testBestEffortRFC5425(t, "tcp", address, false, nil) +} + +func TestStrict_tcp_tls(t *testing.T) { + testStrictRFC5425(t, "tcp", address, true, nil) +} + +func TestBestEffort_tcp_tls(t *testing.T) { + testBestEffortRFC5425(t, "tcp", address, true, nil) +} + +func TestStrictWithKeepAlive_tcp_tls(t *testing.T) { + testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: time.Minute}) +} + +func TestStrictWithZeroKeepAlive_tcp_tls(t *testing.T) { + testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: 0}) +} + +func TestStrict_unix(t *testing.T) { + testStrictRFC5425(t, "unix", "/tmp/telegraf_test.sock", false, nil) +} + +func TestBestEffort_unix(t *testing.T) { + testBestEffortRFC5425(t, "unix", "/tmp/telegraf_test.sock", false, nil) +} + +func TestStrict_unix_tls(t *testing.T) { + testStrictRFC5425(t, "unix", "/tmp/telegraf_test.sock", true, nil) +} + +func TestBestEffort_unix_tls(t *testing.T) { + testBestEffortRFC5425(t, "unix", "/tmp/telegraf_test.sock", true, nil) +} diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go new file mode 100644 index 0000000000000..becd1c4b5e6b5 --- /dev/null +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -0,0 +1,400 @@ +package syslog + +import ( + "fmt" + "net" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +type testCase5426 struct { + name string + data []byte + wantBestEffort *testutil.Metric + wantStrict *testutil.Metric + werr bool +} + +func getTestCasesForRFC5426() []testCase5426 { + testCases := []testCase5426{ + { + name: "empty", + data: []byte(""), + werr: true, + }, + { + name: "complete", + data: []byte("<1>1 - - - - - - A"), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "A", + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "message": "A", + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + { + name: "one/per/packet", + data: []byte("<1>3 - - - - - - A<1>4 - - - - - - B"), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "A<1>4 - - - - - - B", + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(3), + "message": "A<1>4 - - - - - - B", + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + }, + { + name: "average", + data: []byte(`<29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UnixNano(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta_sequence": "14125553", + "meta_service": "someservice", + "severity_code": 5, + "facility_code": 3, + }, + Tags: map[string]string{ + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UnixNano(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta_sequence": "14125553", + "meta_service": "someservice", + "severity_code": 5, + "facility_code": 3, + }, + Tags: map[string]string{ + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + { + name: "max", + data: []byte(fmt.Sprintf("<%d>%d %s %s %s %s %s - %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "severity_code": 7, + "facility_code": 23, + }, + Tags: map[string]string{ + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + wantStrict: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": maxV, + "timestamp": time.Unix(1514764799, 999999000).UnixNano(), + "message": message7681, + "procid": maxPID, + "msgid": maxMID, + "severity_code": 7, + "facility_code": 23, + }, + Tags: map[string]string{ + "severity": "debug", + "facility": "local7", + "hostname": maxH, + "appname": maxA, + }, + Time: defaultTime, + }, + }, + { + name: "minimal/incomplete", + data: []byte("<1>2"), + wantBestEffort: &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + werr: true, + }, + } + + return testCases +} + +func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog { + return &Syslog{ + Address: address, + now: func() time.Time { + return defaultTime + }, + BestEffort: bestEffort, + Separator: "_", + } +} + +func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) { + for _, tc := range getTestCasesForRFC5426() { + t.Run(tc.name, func(t *testing.T) { + // Create receiver + receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort) + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + // Clear + acc.ClearMetrics() + acc.Errors = make([]error, 0) + + // Connect + conn, err := net.Dial(protocol, address) + require.NotNil(t, conn) + defer conn.Close() + require.Nil(t, err) + + // Write + _, e := conn.Write(tc.data) + require.Nil(t, e) + + // Waiting ... + if tc.wantStrict == nil && tc.werr || bestEffort && tc.werr { + acc.WaitError(1) + } + if tc.wantBestEffort != nil && bestEffort || tc.wantStrict != nil && !bestEffort { + acc.Wait(1) // RFC5426 mandates a syslog message per UDP packet + } + + // Compare + var got *testutil.Metric + var want *testutil.Metric + if len(acc.Metrics) > 0 { + got = acc.Metrics[0] + } + if bestEffort { + want = tc.wantBestEffort + } else { + want = tc.wantStrict + } + if !cmp.Equal(want, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, got)) + } + }) + } +} + +func TestBestEffort_udp(t *testing.T) { + testRFC5426(t, "udp", address, true) +} + +func TestStrict_udp(t *testing.T) { + testRFC5426(t, "udp", address, false) +} + +func TestBestEffort_unixgram(t *testing.T) { + sockname := "/tmp/telegraf_test.sock" + os.Create(sockname) + testRFC5426(t, "unixgram", sockname, true) +} + +func TestStrict_unixgram(t *testing.T) { + sockname := "/tmp/telegraf_test.sock" + os.Create(sockname) + testRFC5426(t, "unixgram", sockname, false) +} + +func TestTimeIncrement_udp(t *testing.T) { + var i int64 + atomic.StoreInt64(&i, 0) + getNow := func() time.Time { + if atomic.LoadInt64(&i)%2 == 0 { + return time.Unix(1, 0) + } + return time.Unix(1, 1) + } + + // Create receiver + receiver := &Syslog{ + Address: "udp://" + address, + now: getNow, + BestEffort: false, + Separator: "_", + } + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + // Connect + conn, err := net.Dial("udp", address) + require.NotNil(t, conn) + defer conn.Close() + require.Nil(t, err) + + // Write + _, e := conn.Write([]byte("<1>1 - - - - - -")) + require.Nil(t, e) + + // Wait + acc.Wait(1) + + want := &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: getNow(), + } + + if !cmp.Equal(want, acc.Metrics[0]) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0])) + } + + // New one with different time + atomic.StoreInt64(&i, atomic.LoadInt64(&i)+1) + + // Clear + acc.ClearMetrics() + + // Write + _, e = conn.Write([]byte("<1>1 - - - - - -")) + require.Nil(t, e) + + // Wait + acc.Wait(1) + + want = &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: getNow(), + } + + if !cmp.Equal(want, acc.Metrics[0]) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0])) + } + + // New one with same time as previous one + + // Clear + acc.ClearMetrics() + + // Write + _, e = conn.Write([]byte("<1>1 - - - - - -")) + require.Nil(t, e) + + // Wait + acc.Wait(1) + + want = &testutil.Metric{ + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "facility_code": 0, + "severity_code": 1, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: getNow().Add(time.Nanosecond), + } + + if !cmp.Equal(want, acc.Metrics[0]) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0])) + } +} diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go new file mode 100644 index 0000000000000..21f6a770fa0b1 --- /dev/null +++ b/plugins/inputs/syslog/syslog.go @@ -0,0 +1,419 @@ +package syslog + +import ( + "crypto/tls" + "fmt" + "io" + "net" + "net/url" + "os" + "strings" + "sync" + "time" + + "github.com/influxdata/go-syslog/rfc5424" + "github.com/influxdata/go-syslog/rfc5425" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + tlsConfig "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const defaultReadTimeout = time.Millisecond * 500 +const ipMaxPacketSize = 64 * 1024 + +// Syslog is a syslog plugin +type Syslog struct { + tlsConfig.ServerConfig + Address string `toml:"server"` + KeepAlivePeriod *internal.Duration + ReadTimeout *internal.Duration + MaxConnections int + BestEffort bool + Separator string `toml:"sdparam_separator"` + + now func() time.Time + lastTime time.Time + + mu sync.Mutex + wg sync.WaitGroup + io.Closer + + isStream bool + tcpListener net.Listener + tlsConfig *tls.Config + connections map[string]net.Conn + connectionsMu sync.Mutex + + udpListener net.PacketConn +} + +var sampleConfig = ` + ## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514 + ## Protocol, address and port to host the syslog receiver. + ## If no host is specified, then localhost is used. + ## If no port is specified, 6514 is used (RFC5425#section-4.1). + server = "tcp://:6514" + + ## TLS Config + # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Period between keep alive probes. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + ## Only applies to stream sockets (e.g. TCP). + # keep_alive_period = "5m" + + ## Maximum number of concurrent connections (default = 0). + ## 0 means unlimited. + ## Only applies to stream sockets (e.g. TCP). + # max_connections = 1024 + + ## Read timeout (default = 500ms). + ## 0 means unlimited. + # read_timeout = 500ms + + ## Whether to parse in best effort mode or not (default = false). + ## By default best effort parsing is off. + # best_effort = false + + ## Character to prepend to SD-PARAMs (default = "_"). + ## A syslog message can contain multiple parameters and multiple identifiers within structured data section. + ## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"] + ## For each combination a field is created. + ## Its name is created concatenating identifier, sdparam_separator, and parameter name. + # sdparam_separator = "_" +` + +// SampleConfig returns sample configuration message +func (s *Syslog) SampleConfig() string { + return sampleConfig +} + +// Description returns the plugin description +func (s *Syslog) Description() string { + return "Accepts syslog messages per RFC5425" +} + +// Gather ... +func (s *Syslog) Gather(_ telegraf.Accumulator) error { + return nil +} + +// Start starts the service. +func (s *Syslog) Start(acc telegraf.Accumulator) error { + s.mu.Lock() + defer s.mu.Unlock() + + scheme, host, err := getAddressParts(s.Address) + if err != nil { + return err + } + s.Address = host + + switch scheme { + case "tcp", "tcp4", "tcp6", "unix", "unixpacket": + s.isStream = true + case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": + s.isStream = false + default: + return fmt.Errorf("unknown protocol '%s' in '%s'", scheme, s.Address) + } + + if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { + os.Remove(s.Address) + } + + if s.isStream { + l, err := net.Listen(scheme, s.Address) + if err != nil { + return err + } + s.Closer = l + s.tcpListener = l + s.tlsConfig, err = s.TLSConfig() + if err != nil { + return err + } + + s.wg.Add(1) + go s.listenStream(acc) + } else { + l, err := net.ListenPacket(scheme, s.Address) + if err != nil { + return err + } + s.Closer = l + s.udpListener = l + + s.wg.Add(1) + go s.listenPacket(acc) + } + + if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { + s.Closer = unixCloser{path: s.Address, closer: s.Closer} + } + + return nil +} + +// Stop cleans up all resources +func (s *Syslog) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.Closer != nil { + s.Close() + } + s.wg.Wait() +} + +// getAddressParts returns the address scheme and host +// it also sets defaults for them when missing +// when the input address does not specify the protocol it returns an error +func getAddressParts(a string) (string, string, error) { + parts := strings.SplitN(a, "://", 2) + if len(parts) != 2 { + return "", "", fmt.Errorf("missing protocol within address '%s'", a) + } + + u, _ := url.Parse(a) + switch u.Scheme { + case "unix", "unixpacket", "unixgram": + return parts[0], parts[1], nil + } + + var host string + if u.Hostname() != "" { + host = u.Hostname() + } + host += ":" + if u.Port() == "" { + host += "6514" + } else { + host += u.Port() + } + + return u.Scheme, host, nil +} + +func (s *Syslog) listenPacket(acc telegraf.Accumulator) { + defer s.wg.Done() + b := make([]byte, ipMaxPacketSize) + p := rfc5424.NewParser() + for { + n, _, err := s.udpListener.ReadFrom(b) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + acc.AddError(err) + } + break + } + + if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { + s.udpListener.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) + } + + message, err := p.Parse(b[:n], &s.BestEffort) + if message != nil { + acc.AddFields("syslog", fields(*message, s), tags(*message), s.time()) + } + if err != nil { + acc.AddError(err) + } + } +} + +func (s *Syslog) listenStream(acc telegraf.Accumulator) { + defer s.wg.Done() + + s.connections = map[string]net.Conn{} + + for { + conn, err := s.tcpListener.Accept() + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + acc.AddError(err) + } + break + } + var tcpConn, _ = conn.(*net.TCPConn) + if s.tlsConfig != nil { + conn = tls.Server(conn, s.tlsConfig) + } + + s.connectionsMu.Lock() + if s.MaxConnections > 0 && len(s.connections) >= s.MaxConnections { + s.connectionsMu.Unlock() + conn.Close() + continue + } + s.connections[conn.RemoteAddr().String()] = conn + s.connectionsMu.Unlock() + + if err := s.setKeepAlive(tcpConn); err != nil { + acc.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", s.Address, err)) + } + + go s.handle(conn, acc) + } + + s.connectionsMu.Lock() + for _, c := range s.connections { + c.Close() + } + s.connectionsMu.Unlock() +} + +func (s *Syslog) removeConnection(c net.Conn) { + s.connectionsMu.Lock() + delete(s.connections, c.RemoteAddr().String()) + s.connectionsMu.Unlock() +} + +func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { + defer func() { + s.removeConnection(conn) + conn.Close() + }() + + if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { + conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) + } + + var p *rfc5425.Parser + if s.BestEffort { + p = rfc5425.NewParser(conn, rfc5425.WithBestEffort()) + } else { + p = rfc5425.NewParser(conn) + } + + p.ParseExecuting(func(r *rfc5425.Result) { + s.store(*r, acc) + }) +} + +func (s *Syslog) setKeepAlive(c *net.TCPConn) error { + if s.KeepAlivePeriod == nil { + return nil + } + + if s.KeepAlivePeriod.Duration == 0 { + return c.SetKeepAlive(false) + } + if err := c.SetKeepAlive(true); err != nil { + return err + } + return c.SetKeepAlivePeriod(s.KeepAlivePeriod.Duration) +} + +func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) { + if res.Error != nil { + acc.AddError(res.Error) + } + if res.MessageError != nil { + acc.AddError(res.MessageError) + } + if res.Message != nil { + msg := *res.Message + acc.AddFields("syslog", fields(msg, s), tags(msg), s.time()) + } +} + +func tags(msg rfc5424.SyslogMessage) map[string]string { + ts := map[string]string{} + + // Not checking assuming a minimally valid message + ts["severity"] = *msg.SeverityShortLevel() + ts["facility"] = *msg.FacilityLevel() + + if msg.Hostname() != nil { + ts["hostname"] = *msg.Hostname() + } + + if msg.Appname() != nil { + ts["appname"] = *msg.Appname() + } + + return ts +} + +func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} { + // Not checking assuming a minimally valid message + flds := map[string]interface{}{ + "version": msg.Version(), + } + flds["severity_code"] = int(*msg.Severity()) + flds["facility_code"] = int(*msg.Facility()) + + if msg.Timestamp() != nil { + flds["timestamp"] = (*msg.Timestamp()).UnixNano() + } + + if msg.ProcID() != nil { + flds["procid"] = *msg.ProcID() + } + + if msg.MsgID() != nil { + flds["msgid"] = *msg.MsgID() + } + + if msg.Message() != nil { + flds["message"] = *msg.Message() + } + + if msg.StructuredData() != nil { + for sdid, sdparams := range *msg.StructuredData() { + if len(sdparams) == 0 { + // When SD-ID does not have params we indicate its presence with a bool + flds[sdid] = true + continue + } + for name, value := range sdparams { + // Using whitespace as separator since it is not allowed by the grammar within SDID + flds[sdid+s.Separator+name] = value + } + } + } + + return flds +} + +type unixCloser struct { + path string + closer io.Closer +} + +func (uc unixCloser) Close() error { + err := uc.closer.Close() + os.Remove(uc.path) // ignore error + return err +} + +func (s *Syslog) time() time.Time { + t := s.now() + if t == s.lastTime { + t = t.Add(time.Nanosecond) + } + s.lastTime = t + return t +} + +func getNanoNow() time.Time { + return time.Unix(0, time.Now().UnixNano()) +} + +func init() { + receiver := &Syslog{ + Address: ":6514", + now: getNanoNow, + ReadTimeout: &internal.Duration{ + Duration: defaultReadTimeout, + }, + Separator: "_", + } + + inputs.Add("syslog", func() telegraf.Input { return receiver }) +} diff --git a/plugins/inputs/syslog/syslog_test.go b/plugins/inputs/syslog/syslog_test.go new file mode 100644 index 0000000000000..8fa0d9c943dcf --- /dev/null +++ b/plugins/inputs/syslog/syslog_test.go @@ -0,0 +1,60 @@ +package syslog + +import ( + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +const ( + address = ":6514" +) + +var defaultTime = time.Unix(0, 0) +var maxP = uint8(191) +var maxV = uint16(999) +var maxTS = "2017-12-31T23:59:59.999999+00:00" +var maxH = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc" +var maxA = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdef" +var maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzab" +var maxMID = "abcdefghilmnopqrstuvzabcdefghilm" +var message7681 = strings.Repeat("l", 7681) + +func TestAddress(t *testing.T) { + var err error + var rec *Syslog + + rec = &Syslog{ + Address: "localhost:6514", + } + err = rec.Start(&testutil.Accumulator{}) + require.EqualError(t, err, "missing protocol within address 'localhost:6514'") + require.Error(t, err) + + rec = &Syslog{ + Address: "unsupported://example.com:6514", + } + err = rec.Start(&testutil.Accumulator{}) + require.EqualError(t, err, "unknown protocol 'unsupported' in 'example.com:6514'") + require.Error(t, err) + + rec = &Syslog{ + Address: "unixgram:///tmp/telegraf.sock", + } + err = rec.Start(&testutil.Accumulator{}) + require.NoError(t, err) + require.Equal(t, "/tmp/telegraf.sock", rec.Address) + rec.Stop() + + // Default port is 6514 + rec = &Syslog{ + Address: "tcp://localhost", + } + err = rec.Start(&testutil.Accumulator{}) + require.NoError(t, err) + require.Equal(t, "localhost:6514", rec.Address) + rec.Stop() +}