Skip to content

Commit

Permalink
feat(inputs.socket_listener): async parse
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Sep 20, 2024
1 parent 640eda0 commit 538a885
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 59 deletions.
42 changes: 30 additions & 12 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package socket
import (
"errors"
"fmt"
"github.com/alitto/pond"
"github.com/influxdata/telegraf/config"
"io"
"net"
"net/url"
Expand All @@ -12,6 +14,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -24,10 +27,19 @@ type packetListener struct {
ReadBufferSize int
Log telegraf.Logger

conn net.PacketConn
decoder internal.ContentDecoder
path string
wg sync.WaitGroup
conn net.PacketConn
decoder internal.ContentDecoder
path string
wg sync.WaitGroup
parsePool *pond.WorkerPool
}

func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int) *packetListener {
return &packetListener{
Encoding: encoding,
MaxDecompressionSize: int64(maxDecompressionSize),
parsePool: pond.New(maxWorkers, 0, pond.MinWorkers(maxWorkers/2+1)),
}
}

func (l *packetListener) listenData(onData CallbackData, onError CallbackError) {
Expand All @@ -39,6 +51,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, src, err := l.conn.ReadFrom(buf)
receiveTime := time.Now()
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if onError != nil {
Expand All @@ -48,15 +61,20 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
break
}

body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
d := make([]byte, n)
copy(d, buf[:n])
l.parsePool.Submit(func() {
body, err := l.decoder.Decode(d)
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
onData(src, body)
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

onData(src, body, receiveTime)
})
}
}()
}
Expand Down
86 changes: 46 additions & 40 deletions plugins/common/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import (
"bufio"
"crypto/tls"
"fmt"
"github.com/shirou/gopsutil/v3/cpu"
"io"
"net"
"net/url"
"regexp"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
)

type CallbackData func(net.Addr, []byte)
type CallbackData func(net.Addr, []byte, time.Time)
type CallbackConnection func(net.Addr, io.ReadCloser)
type CallbackError func(error)

Expand All @@ -34,6 +36,7 @@ type Config struct {
SocketMode string `toml:"socket_mode"`
ContentEncoding string `toml:"content_encoding"`
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
MaxParallelParsers int `toml:"max_parallel_parsers"`
tlsint.ServerConfig
}

Expand Down Expand Up @@ -96,74 +99,77 @@ func (cfg *Config) NewSocket(address string, splitcfg *SplitConfig, logger teleg
}

func (s *Socket) Setup() error {
if s.MaxParallelParsers == 0 {
cpus, err := cpu.Counts(true)
if err != nil {
return err
}

s.MaxParallelParsers = max(cpus/2, 1)
}

switch s.url.Scheme {
case "tcp", "tcp4", "tcp6":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
int(s.ReadBufferSize),
s.ReadTimeout,
s.KeepAlivePeriod,
s.MaxConnections,
s.ContentEncoding,
s.splitter,
s.MaxParallelParsers,
s.log,
)

if err := l.setupTCP(s.url, s.tlsCfg); err != nil {
return err
}
s.listener = l
case "unix", "unixpacket":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
int(s.ReadBufferSize),
s.ReadTimeout,
s.KeepAlivePeriod,
s.MaxConnections,
s.ContentEncoding,
s.splitter,
s.MaxParallelParsers,
s.log,
)

if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil {
return err
}
s.listener = l
case "udp", "udp4", "udp6":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil {
return err
}
s.listener = l
case "ip", "ip4", "ip6":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupIP(s.url); err != nil {
return err
}
s.listener = l
case "unixgram":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupUnixgram(s.url, s.SocketMode); err != nil {
return err
}
s.listener = l
case "vsock":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
int(s.ReadBufferSize),
s.ReadTimeout,
s.KeepAlivePeriod,
s.MaxConnections,
s.ContentEncoding,
s.splitter,
s.MaxParallelParsers,
s.log,
)

if err := l.setupVsock(s.url); err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions plugins/common/socket/socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestListenData(t *testing.T) {
require.NoError(t, parser.Init())

var acc testutil.Accumulator
onData := func(remote net.Addr, data []byte) {
onData := func(remote net.Addr, data []byte, timestamp time.Time) {
m, err := parser.Parse(data)
require.NoError(t, err)
addr, _, err := net.SplitHostPort(remote.String())
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestClosingConnections(t *testing.T) {
require.NoError(t, parser.Init())

var acc testutil.Accumulator
onData := func(_ net.Addr, data []byte) {
onData := func(_ net.Addr, data []byte, timestamp time.Time) {
m, err := parser.Parse(data)
require.NoError(t, err)
acc.AddMetrics(m)
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestMaxConnections(t *testing.T) {
// Create callback
var errs []error
var mu sync.Mutex
onData := func(_ net.Addr, _ []byte) {}
onData := func(_ net.Addr, _ []byte, timestamp time.Time) {}
onError := func(err error) {
mu.Lock()
errs = append(errs, err)
Expand Down
30 changes: 28 additions & 2 deletions plugins/common/socket/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/alitto/pond"
"io"
"math"
"net"
Expand Down Expand Up @@ -43,11 +44,26 @@ type streamListener struct {
connections uint64
path string
cancel context.CancelFunc
parsePool *pond.WorkerPool

wg sync.WaitGroup
sync.Mutex
}

func newStreamListener(readBufferSize int, readTimeout config.Duration, keepAlivePeriod *config.Duration, maxConnections uint64, encoding string, splitter bufio.SplitFunc, maxWorkers int, log telegraf.Logger) *streamListener {
return &streamListener{
ReadBufferSize: readBufferSize,
ReadTimeout: readTimeout,
KeepAlivePeriod: keepAlivePeriod,
MaxConnections: maxConnections,
Encoding: encoding,
Splitter: splitter,
Log: log,

parsePool: pond.New(maxWorkers, 0, pond.MinWorkers(maxWorkers/2+1)),
}
}

func (l *streamListener) setupTCP(u *url.URL, tlsCfg *tls.Config) error {
var err error
if tlsCfg == nil {
Expand Down Expand Up @@ -330,12 +346,18 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
break
}

receiveTime := time.Now()
src := conn.RemoteAddr()
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unix"}
}

data := scanner.Bytes()
onData(src, data)
d := make([]byte, len(data))
copy(d, data)
l.parsePool.Submit(func() {
onData(src, d, receiveTime)
})
}

if err := scanner.Err(); err != nil {
Expand Down Expand Up @@ -379,7 +401,11 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error {
if err != nil {
return fmt.Errorf("read on %s failed: %w", src, err)
}
onData(src, buf)

receiveTime := time.Now()
l.parsePool.Submit(func() {
onData(src, buf, receiveTime)
})

return nil
}
Expand Down
13 changes: 12 additions & 1 deletion plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "embed"
"net"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -20,6 +21,7 @@ var once sync.Once

type SocketListener struct {
ServiceAddress string `toml:"service_address"`
TimeSource string `toml:"time_source"`
Log telegraf.Logger `toml:"-"`
socket.Config
socket.SplitConfig
Expand Down Expand Up @@ -52,18 +54,27 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) {

func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
// Create the callbacks for parsing the data and recording issues
onData := func(_ net.Addr, data []byte) {
onData := func(_ net.Addr, data []byte, receiveTime time.Time) {
metrics, err := sl.parser.Parse(data)

if err != nil {
acc.AddError(err)
return
}

if len(metrics) == 0 {
once.Do(func() {
sl.Log.Debug(internal.NoMetricsCreatedMsg)
})
}

for _, m := range metrics {
switch sl.TimeSource {
case "", "metric":
case "receive_time":
m.SetTime(receiveTime)
}

acc.AddMetric(m)
}
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"strings"
"sync"
"time"
"unicode"

"github.com/leodido/go-syslog/v4"
Expand Down Expand Up @@ -214,7 +215,7 @@ func (s *Syslog) createDatagramDataHandler(acc telegraf.Accumulator) socket.Call
}

// Return the OnData function
return func(src net.Addr, data []byte) {
return func(src net.Addr, data []byte, _ time.Time) {
message, err := parser.Parse(data)
if err != nil {
acc.AddError(err)
Expand Down

0 comments on commit 538a885

Please sign in to comment.