Skip to content

Commit

Permalink
metrics: collect drops on receiver (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn authored Jan 6, 2024
1 parent 9c3b0e0 commit ff4ddca
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 4 deletions.
9 changes: 5 additions & 4 deletions cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,11 @@ func main() {
l.Info("starting collection")

cfg := &utils.UDPReceiverConfig{
Sockets: numSockets,
Workers: numWorkers,
QueueSize: queueSize,
Blocking: isBlocking,
Sockets: numSockets,
Workers: numWorkers,
QueueSize: queueSize,
Blocking: isBlocking,
ReceiverCallback: metrics.NewReceiverMetric(),
}
recv, err := utils.NewUDPReceiver(cfg)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@ const (
)

var (
MetricReceivedDroppedPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_dropped_packets_total",
Help: "Packets dropped before processing.",
Namespace: NAMESPACE,
},
[]string{"remote_ip", "local_ip", "local_port"},
)
MetricReceivedDroppedBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_dropped_bytes_total",
Help: "Bytes dropped before processing.",
Namespace: NAMESPACE,
},
[]string{"remote_ip", "local_ip", "local_port"},
)
MetricTrafficBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_traffic_bytes_total",
Expand Down Expand Up @@ -114,6 +130,9 @@ var (
)

func init() {
prometheus.MustRegister(MetricReceivedDroppedPackets)
prometheus.MustRegister(MetricReceivedDroppedBytes)

prometheus.MustRegister(MetricTrafficBytes)
prometheus.MustRegister(MetricTrafficPackets)
prometheus.MustRegister(MetricPacketSizeSum)
Expand Down
32 changes: 32 additions & 0 deletions metrics/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package metrics

import (
"fmt"

"github.com/netsampler/goflow2/v2/utils"

"github.com/prometheus/client_golang/prometheus"
)

type ReceiverMetric struct {
}

func NewReceiverMetric() *ReceiverMetric {
return &ReceiverMetric{}
}

func (r *ReceiverMetric) Dropped(pkt utils.Message) {
remote := pkt.Src.Addr().Unmap().String()
localIP := pkt.Dst.Addr().Unmap().String()

port := fmt.Sprintf("%d", pkt.Dst.Port())
size := len(pkt.Payload)

labels := prometheus.Labels{
"remote_ip": remote,
"local_ip": localIP,
"local_port": port,
}
MetricReceivedDroppedPackets.With(labels).Inc()
MetricReceivedDroppedBytes.With(labels).Add(float64(size))
}
17 changes: 17 additions & 0 deletions utils/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
reuseport "github.com/libp2p/go-reuseport"
)

type ReceiverCallback interface {
Dropped(msg Message)
}

// Callback used to decode a UDP message
type DecoderFunc func(msg interface{}) error

Expand Down Expand Up @@ -48,13 +52,17 @@ type UDPReceiver struct {

workers int
sockets int

cb ReceiverCallback
}

type UDPReceiverConfig struct {
Workers int
Sockets int
Blocking bool
QueueSize int

ReceiverCallback ReceiverCallback
}

func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) {
Expand All @@ -80,6 +88,7 @@ func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) {
r.workers = cfg.Workers
dispatchSize = cfg.QueueSize
r.blocking = cfg.Blocking
r.cb = cfg.ReceiverCallback
}

if dispatchSize == 0 {
Expand Down Expand Up @@ -171,6 +180,14 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
case <-r.q:
return nil
default:
if r.cb != nil {
r.cb.Dropped(Message{
Src: pkt.src.AddrPort(),
Dst: pkt.dst.AddrPort(),
Payload: pkt.payload[0:pkt.size],
Received: pkt.received,
})
}
packetPool.Put(pkt)
// increase counter
}
Expand Down

0 comments on commit ff4ddca

Please sign in to comment.