Skip to content

Commit

Permalink
Add udp internal metrics for the statsd input (influxdata#6921)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent abcd6d3 commit 547a942
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,12 @@ type Statsd struct {
MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat
TCPPacketsRecv selfstat.Stat
TCPBytesRecv selfstat.Stat
UDPPacketsRecv selfstat.Stat
UDPPacketsDrop selfstat.Stat
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat

Log telegraf.Logger

Expand Down Expand Up @@ -327,8 +331,12 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.MaxConnections.Set(int64(s.MaxTCPConnections))
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags)
s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags)
s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags)
s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags)

s.in = make(chan input, s.AllowedPendingMessages)
s.done = make(chan struct{})
Expand Down Expand Up @@ -461,6 +469,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
}
return err
}
s.UDPPacketsRecv.Incr(1)
s.UDPBytesRecv.Incr(int64(n))
b := s.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Write(buf[:n])
Expand All @@ -470,6 +480,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
Time: time.Now(),
Addr: addr.IP.String()}:
default:
s.UDPPacketsDrop.Incr(1)
s.drops++
if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
s.Log.Errorf("Statsd message queue full. "+
Expand All @@ -490,6 +501,7 @@ func (s *Statsd) parser() error {
case <-s.done:
return nil
case in := <-s.in:
start := time.Now()
lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer)
for _, line := range lines {
Expand All @@ -502,6 +514,8 @@ func (s *Statsd) parser() error {
s.parseStatsdLine(line)
}
}
elapsed := time.Since(start)
s.ParseTimeNS.Set(elapsed.Nanoseconds())
}
}
}
Expand Down Expand Up @@ -834,8 +848,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
if n == 0 {
continue
}
s.BytesRecv.Incr(int64(n))
s.PacketsRecv.Incr(1)
s.TCPBytesRecv.Incr(int64(n))
s.TCPPacketsRecv.Incr(1)

b := s.bufPool.Get().(*bytes.Buffer)
b.Reset()
Expand Down

0 comments on commit 547a942

Please sign in to comment.