From 063d5c4ab01c2797654f604697917dd2f2b1485d Mon Sep 17 00:00:00 2001 From: Nicolai Ommer Date: Mon, 22 Feb 2021 17:27:04 +0100 Subject: [PATCH] Use Multicast Server from game-controller with multiple nif support --- cmd/ssl-quality-inspector/main.go | 14 ++- pkg/sslnet/multicast_server.go | 138 ++++++++++++++++++++++++++++++ pkg/vision/udpWatcher.go | 67 --------------- 3 files changed, 150 insertions(+), 69 deletions(-) create mode 100644 pkg/sslnet/multicast_server.go delete mode 100644 pkg/vision/udpWatcher.go diff --git a/cmd/ssl-quality-inspector/main.go b/cmd/ssl-quality-inspector/main.go index 1fc4e22..e970045 100644 --- a/cmd/ssl-quality-inspector/main.go +++ b/cmd/ssl-quality-inspector/main.go @@ -5,7 +5,10 @@ import ( "fmt" "github.com/RoboCup-SSL/ssl-quality-inspector/pkg/clock" "github.com/RoboCup-SSL/ssl-quality-inspector/pkg/network" + "github.com/RoboCup-SSL/ssl-quality-inspector/pkg/sslnet" "github.com/RoboCup-SSL/ssl-quality-inspector/pkg/vision" + "github.com/golang/protobuf/proto" + "log" "sort" "strings" "time" @@ -32,8 +35,15 @@ func main() { statsConfig.TimeWindowQualityBall = *timeWindowQualityBall statsConfig.TimeWindowQualityRobot = *timeWindowQualityRobot stats := vision.NewStats(statsConfig) - udpWatcher := vision.NewUdpWatcher(stats.Process) - go udpWatcher.Watch(*visionAddress) + mcServer := sslnet.NewMulticastServer(func(bytes []byte) { + wrapper := new(vision.SSL_WrapperPacket) + if err := proto.Unmarshal(bytes, wrapper); err != nil { + log.Println("Could not unmarshal message") + } else { + stats.Process(wrapper) + } + }) + mcServer.Start(*visionAddress) clockWatchers := map[string]*clock.Watcher{} activeSources := map[string]bool{} diff --git a/pkg/sslnet/multicast_server.go b/pkg/sslnet/multicast_server.go new file mode 100644 index 0000000..832524f --- /dev/null +++ b/pkg/sslnet/multicast_server.go @@ -0,0 +1,138 @@ +package sslnet + +import ( + "log" + "net" + "sync" + "time" +) + +const maxDatagramSize = 8192 + +type MulticastServer struct { + connection *net.UDPConn + running bool + consumer func([]byte) + mutex sync.Mutex + SkipInterfaces []string + Verbose bool +} + +func NewMulticastServer(consumer func([]byte)) (r *MulticastServer) { + r = new(MulticastServer) + r.consumer = consumer + return +} + +func (r *MulticastServer) Start(multicastAddress string) { + r.running = true + go r.receive(multicastAddress) +} + +func (r *MulticastServer) Stop() { + r.mutex.Lock() + defer r.mutex.Unlock() + r.running = false + if err := r.connection.Close(); err != nil { + log.Println("Could not close connection: ", err) + } +} + +func (r *MulticastServer) receive(multicastAddress string) { + var currentIfiIdx = 0 + for r.isRunning() { + ifis := r.interfaces() + currentIfiIdx = currentIfiIdx % len(ifis) + ifi := ifis[currentIfiIdx] + r.receiveOnInterface(multicastAddress, ifi) + currentIfiIdx++ + if currentIfiIdx >= len(ifis) { + // cycled though all interfaces once, make a short break to avoid producing endless log messages + time.Sleep(300 * time.Millisecond) + } + } +} + +func (r *MulticastServer) isRunning() bool { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.running +} + +func (r *MulticastServer) interfaces() (interfaces []net.Interface) { + interfaces = []net.Interface{} + ifis, err := net.Interfaces() + if err != nil { + log.Println("Could not get available interfaces: ", err) + } + for _, ifi := range ifis { + if ifi.Flags&net.FlagMulticast == 0 || // No multicast support + r.skipInterface(ifi.Name) { + continue + } + interfaces = append(interfaces, ifi) + } + return +} + +func (r *MulticastServer) skipInterface(ifiName string) bool { + for _, skipIfi := range r.SkipInterfaces { + if skipIfi == ifiName { + return true + } + } + return false +} + +func (r *MulticastServer) receiveOnInterface(multicastAddress string, ifi net.Interface) { + addr, err := net.ResolveUDPAddr("udp", multicastAddress) + if err != nil { + log.Printf("Could resolve multicast address %v: %v", multicastAddress, err) + return + } + + r.connection, err = net.ListenMulticastUDP("udp", &ifi, addr) + if err != nil { + log.Printf("Could not listen at %v: %v", multicastAddress, err) + return + } + + if err := r.connection.SetReadBuffer(maxDatagramSize); err != nil { + log.Println("Could not set read buffer: ", err) + } + + if r.Verbose { + log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name) + } + + first := true + data := make([]byte, maxDatagramSize) + for { + if err := r.connection.SetDeadline(time.Now().Add(300 * time.Millisecond)); err != nil { + log.Println("Could not set deadline on connection: ", err) + } + n, _, err := r.connection.ReadFromUDP(data) + if err != nil { + if r.Verbose { + log.Println("ReadFromUDP failed:", err) + } + break + } + + if first && r.Verbose { + log.Printf("Got first data packets from %s (%s)", multicastAddress, ifi.Name) + first = false + } + + r.consumer(data[:n]) + } + + if r.Verbose { + log.Printf("Stop listening on %s (%s)", multicastAddress, ifi.Name) + } + + if err := r.connection.Close(); err != nil { + log.Println("Could not close listener: ", err) + } + return +} diff --git a/pkg/vision/udpWatcher.go b/pkg/vision/udpWatcher.go deleted file mode 100644 index c824f59..0000000 --- a/pkg/vision/udpWatcher.go +++ /dev/null @@ -1,67 +0,0 @@ -package vision - -import ( - "github.com/golang/protobuf/proto" - "log" - "net" - "time" -) - -const maxDatagramSize = 8192 - -type UdpWatcher struct { - Callback func(*SSL_WrapperPacket) -} - -func NewUdpWatcher(callback func(*SSL_WrapperPacket)) (w UdpWatcher) { - w.Callback = callback - return w -} - -func (w *UdpWatcher) Watch(address string) { - addr, err := net.ResolveUDPAddr("udp", address) - if err != nil { - log.Print(err) - return - } - conn, err := net.ListenMulticastUDP("udp", nil, addr) - if err != nil { - log.Print(err) - return - } - - if err := conn.SetReadBuffer(maxDatagramSize); err != nil { - log.Printf("Could not set read buffer to %v.", maxDatagramSize) - } - - c1 := make(chan *SSL_WrapperPacket, 10) - go w.receive(conn, c1) - for { - select { - case wrapper := <-c1: - w.Callback(wrapper) - case <-time.After(1 * time.Second): - w.Callback(nil) - } - } -} - -func (w *UdpWatcher) receive(conn *net.UDPConn, c1 chan *SSL_WrapperPacket) { - b := make([]byte, maxDatagramSize) - for { - n, err := conn.Read(b) - if err != nil { - log.Print("Could not read", err) - continue - } else if n >= maxDatagramSize { - log.Print("Buffer size too small") - } - - wrapper := new(SSL_WrapperPacket) - if err := proto.Unmarshal(b[0:n], wrapper); err != nil { - log.Println("Could not unmarshal message") - continue - } - c1 <- wrapper - } -}