Skip to content

Commit

Permalink
feat(common.socket): revert timestamping
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Oct 3, 2024
1 parent ba7464f commit a22481d
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 26 deletions.
9 changes: 4 additions & 5 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package socket
import (
"errors"
"fmt"
"github.com/alitto/pond"
"github.com/influxdata/telegraf/config"
"io"
"net"
"net/url"
Expand All @@ -14,9 +12,11 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/alitto/pond"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
)

Expand Down Expand Up @@ -51,7 +51,6 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, src, err := l.conn.ReadFrom(buf)
receiveTime := time.Now()
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if onError != nil {
Expand All @@ -75,7 +74,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

onData(src, body, receiveTime)
onData(src, body)
})
}
}()
Expand Down
3 changes: 1 addition & 2 deletions plugins/common/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"net/url"
"regexp"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
)

type CallbackData func(net.Addr, []byte, time.Time)
type CallbackData func(net.Addr, []byte)
type CallbackConnection func(net.Addr, io.ReadCloser)
type CallbackError func(error)

Expand Down
6 changes: 3 additions & 3 deletions plugins/common/socket/socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestListenData(t *testing.T) {
require.NoError(t, parser.Init())

var acc testutil.Accumulator
onData := func(remote net.Addr, data []byte, _ time.Time) {
onData := func(remote net.Addr, data []byte) {
m, err := parser.Parse(data)
require.NoError(t, err)
addr, _, err := net.SplitHostPort(remote.String())
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestClosingConnections(t *testing.T) {
require.NoError(t, parser.Init())

var acc testutil.Accumulator
onData := func(_ net.Addr, data []byte, _ time.Time) {
onData := func(_ net.Addr, data []byte) {
m, err := parser.Parse(data)
require.NoError(t, err)
acc.AddMetrics(m)
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestMaxConnections(t *testing.T) {
// Create callback
var errs []error
var mu sync.Mutex
onData := func(_ net.Addr, _ []byte, _ time.Time) {}
onData := func(_ net.Addr, _ []byte) {}
onError := func(err error) {
mu.Lock()
errs = append(errs, err)
Expand Down
8 changes: 3 additions & 5 deletions plugins/common/socket/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/alitto/pond"
"io"
"math"
"net"
Expand All @@ -20,6 +19,7 @@ import (
"syscall"
"time"

"github.com/alitto/pond"
"github.com/mdlayher/vsock"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -352,7 +352,6 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
break
}

receiveTime := time.Now()
src := conn.RemoteAddr()
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unix"}
Expand All @@ -362,7 +361,7 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
d := make([]byte, len(data))
copy(d, data)
l.parsePool.Submit(func() {
onData(src, d, receiveTime)
onData(src, d)
})
}

Expand Down Expand Up @@ -408,9 +407,8 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error {
return fmt.Errorf("read on %s failed: %w", src, err)
}

receiveTime := time.Now()
l.parsePool.Submit(func() {
onData(src, buf, receiveTime)
onData(src, buf)
})

return nil
Expand Down
10 changes: 1 addition & 9 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
_ "embed"
"net"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -21,7 +20,6 @@ var once sync.Once

type SocketListener struct {
ServiceAddress string `toml:"service_address"`
TimeSource string `toml:"time_source"`
Log telegraf.Logger `toml:"-"`
socket.Config
socket.SplitConfig
Expand Down Expand Up @@ -54,7 +52,7 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) {

func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
// Create the callbacks for parsing the data and recording issues
onData := func(_ net.Addr, data []byte, receiveTime time.Time) {
onData := func(_ net.Addr, data []byte) {
metrics, err := sl.parser.Parse(data)

if err != nil {
Expand All @@ -69,12 +67,6 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
}

for _, m := range metrics {
switch sl.TimeSource {
case "", "metric":
case "receive_time":
m.SetTime(receiveTime)
}

acc.AddMetric(m)
}
}
Expand Down
3 changes: 1 addition & 2 deletions plugins/inputs/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"net/url"
"strings"
"sync"
"time"
"unicode"

"github.com/leodido/go-syslog/v4"
Expand Down Expand Up @@ -215,7 +214,7 @@ func (s *Syslog) createDatagramDataHandler(acc telegraf.Accumulator) socket.Call
}

// Return the OnData function
return func(src net.Addr, data []byte, _ time.Time) {
return func(src net.Addr, data []byte) {
message, err := parser.Parse(data)
if err != nil {
acc.AddError(err)
Expand Down

0 comments on commit a22481d

Please sign in to comment.