diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index 3bfae6cd0fe62..b7d55e38a4a99 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -3,8 +3,6 @@ package socket import ( "errors" "fmt" - "github.com/alitto/pond" - "github.com/influxdata/telegraf/config" "io" "net" "net/url" @@ -14,9 +12,11 @@ import ( "strconv" "strings" "sync" - "time" + + "github.com/alitto/pond" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" ) @@ -51,7 +51,6 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet for { n, src, err := l.conn.ReadFrom(buf) - receiveTime := time.Now() if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { if onError != nil { @@ -75,7 +74,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) src = &net.UnixAddr{Name: l.path, Net: "unixgram"} } - onData(src, body, receiveTime) + onData(src, body) }) } }() diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index ed5bc499cd15e..0a138415e5e06 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -9,14 +9,13 @@ import ( "net/url" "regexp" "strings" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" common_tls "github.com/influxdata/telegraf/plugins/common/tls" ) -type CallbackData func(net.Addr, []byte, time.Time) +type CallbackData func(net.Addr, []byte) type CallbackConnection func(net.Addr, io.ReadCloser) type CallbackError func(error) diff --git a/plugins/common/socket/socket_test.go b/plugins/common/socket/socket_test.go index e979f8a3654cd..5a2a6d7d2d8da 100644 --- a/plugins/common/socket/socket_test.go +++ b/plugins/common/socket/socket_test.go @@ -153,7 +153,7 @@ func TestListenData(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(remote net.Addr, data []byte, _ time.Time) { + onData := func(remote net.Addr, data []byte) { m, err := parser.Parse(data) require.NoError(t, err) addr, _, err := net.SplitHostPort(remote.String()) @@ -450,7 +450,7 @@ func TestClosingConnections(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(_ net.Addr, data []byte, _ time.Time) { + onData := func(_ net.Addr, data []byte) { m, err := parser.Parse(data) require.NoError(t, err) acc.AddMetrics(m) @@ -518,7 +518,7 @@ func TestMaxConnections(t *testing.T) { // Create callback var errs []error var mu sync.Mutex - onData := func(_ net.Addr, _ []byte, _ time.Time) {} + onData := func(_ net.Addr, _ []byte) {} onError := func(err error) { mu.Lock() errs = append(errs, err) diff --git a/plugins/common/socket/stream.go b/plugins/common/socket/stream.go index c1be72258aa4c..8b497e6bfd08f 100644 --- a/plugins/common/socket/stream.go +++ b/plugins/common/socket/stream.go @@ -6,7 +6,6 @@ import ( "crypto/tls" "errors" "fmt" - "github.com/alitto/pond" "io" "math" "net" @@ -20,6 +19,7 @@ import ( "syscall" "time" + "github.com/alitto/pond" "github.com/mdlayher/vsock" "github.com/influxdata/telegraf" @@ -352,7 +352,6 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error { break } - receiveTime := time.Now() src := conn.RemoteAddr() if l.path != "" { src = &net.UnixAddr{Name: l.path, Net: "unix"} @@ -362,7 +361,7 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error { d := make([]byte, len(data)) copy(d, data) l.parsePool.Submit(func() { - onData(src, d, receiveTime) + onData(src, d) }) } @@ -408,9 +407,8 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error { return fmt.Errorf("read on %s failed: %w", src, err) } - receiveTime := time.Now() l.parsePool.Submit(func() { - onData(src, buf, receiveTime) + onData(src, buf) }) return nil diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 5f32af8062627..e097b2512fd7c 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -6,7 +6,6 @@ import ( _ "embed" "net" "sync" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -21,7 +20,6 @@ var once sync.Once type SocketListener struct { ServiceAddress string `toml:"service_address"` - TimeSource string `toml:"time_source"` Log telegraf.Logger `toml:"-"` socket.Config socket.SplitConfig @@ -54,7 +52,7 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) { func (sl *SocketListener) Start(acc telegraf.Accumulator) error { // Create the callbacks for parsing the data and recording issues - onData := func(_ net.Addr, data []byte, receiveTime time.Time) { + onData := func(_ net.Addr, data []byte) { metrics, err := sl.parser.Parse(data) if err != nil { @@ -69,12 +67,6 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { } for _, m := range metrics { - switch sl.TimeSource { - case "", "metric": - case "receive_time": - m.SetTime(receiveTime) - } - acc.AddMetric(m) } } diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index 4fe9f4e8ef3d9..6f5517abc8fe0 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -10,7 +10,6 @@ import ( "net/url" "strings" "sync" - "time" "unicode" "github.com/leodido/go-syslog/v4" @@ -215,7 +214,7 @@ func (s *Syslog) createDatagramDataHandler(acc telegraf.Accumulator) socket.Call } // Return the OnData function - return func(src net.Addr, data []byte, _ time.Time) { + return func(src net.Addr, data []byte) { message, err := parser.Parse(data) if err != nil { acc.AddError(err)