From fedd678623df7b90124a291710e36b977442c67a Mon Sep 17 00:00:00 2001 From: Louis Date: Sat, 17 Aug 2024 17:28:30 -0700 Subject: [PATCH] fix: udp receiver not passing errors (#323) --- utils/udp.go | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/utils/udp.go b/utils/udp.go index 8fd7af7c..7efe73d7 100644 --- a/utils/udp.go +++ b/utils/udp.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "net/netip" + "strings" "sync" "time" @@ -129,11 +130,15 @@ func (r *UDPReceiver) Errors() <-chan error { } func (r *UDPReceiver) receive(addr string, port int, started chan bool) error { + if strings.IndexRune(addr, ':') >= 0 && strings.IndexRune(addr, '[') == -1 { + addr = "[" + addr + "]" + } + pconn, err := reuseport.ListenPacket("udp", fmt.Sprintf("%s:%d", addr, port)) - close(started) if err != nil { return err } + close(started) // indicates receiver is setup q := make(chan bool) // function to quit @@ -148,8 +153,13 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error { udpconn, ok := pconn.(*net.UDPConn) if !ok { - return err + return fmt.Errorf("not a udp connection") } + + return r.receiveRoutine(udpconn) +} + +func (r *UDPReceiver) receiveRoutine(udpconn *net.UDPConn) (err error) { localAddr, _ := udpconn.LocalAddr().(*net.UDPAddr) for { @@ -243,20 +253,34 @@ func (r *UDPReceiver) decoders(workers int, decodeFunc DecoderFunc) error { } // Starts the UDP receiving workers -func (r *UDPReceiver) receivers(sockets int, addr string, port int) error { +func (r *UDPReceiver) receivers(sockets int, addr string, port int) (rErr error) { for i := 0; i < sockets; i++ { + if rErr != nil { // do not instanciate the rest of the receivers + break + } + r.wg.Add(1) - started := make(chan bool) + started := make(chan bool) // indicates receiver setup is complete go func() { defer r.wg.Done() if err := r.receive(addr, port, started); err != nil { - r.logError(&ReceiverError{err}) + err = &ReceiverError{err} + + select { + case <-started: + default: // in case the receiver is not started yet + rErr = err + close(started) + return + } + + r.logError(err) } }() <-started } - return nil + return rErr } // Start UDP receivers and the processing routines @@ -269,9 +293,11 @@ func (r *UDPReceiver) Start(addr string, port int, decodeFunc DecoderFunc) error } if err := r.decoders(r.workers, decodeFunc); err != nil { + r.Stop() return err } if err := r.receivers(r.sockets, addr, port); err != nil { + r.Stop() return err } return nil