Skip to content

Commit

Permalink
fix: udp receiver not passing errors (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn authored Aug 18, 2024
1 parent ec96100 commit f8e113a
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions utils/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net"
"net/netip"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -129,11 +130,15 @@ func (r *UDPReceiver) Errors() <-chan error {
}

func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
if strings.IndexRune(addr, ':') >= 0 && strings.IndexRune(addr, '[') == -1 {
addr = "[" + addr + "]"
}

pconn, err := reuseport.ListenPacket("udp", fmt.Sprintf("%s:%d", addr, port))
close(started)
if err != nil {
return err
}
close(started) // indicates receiver is setup

q := make(chan bool)
// function to quit
Expand All @@ -148,8 +153,13 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {

udpconn, ok := pconn.(*net.UDPConn)
if !ok {
return err
return fmt.Errorf("not a udp connection")
}

return r.receiveRoutine(udpconn)
}

func (r *UDPReceiver) receiveRoutine(udpconn *net.UDPConn) (err error) {
localAddr, _ := udpconn.LocalAddr().(*net.UDPAddr)

for {
Expand Down Expand Up @@ -243,20 +253,34 @@ func (r *UDPReceiver) decoders(workers int, decodeFunc DecoderFunc) error {
}

// Starts the UDP receiving workers
func (r *UDPReceiver) receivers(sockets int, addr string, port int) error {
func (r *UDPReceiver) receivers(sockets int, addr string, port int) (rErr error) {
for i := 0; i < sockets; i++ {
if rErr != nil { // do not instanciate the rest of the receivers
break
}

r.wg.Add(1)
started := make(chan bool)
started := make(chan bool) // indicates receiver setup is complete
go func() {
defer r.wg.Done()
if err := r.receive(addr, port, started); err != nil {
r.logError(&ReceiverError{err})
err = &ReceiverError{err}

select {
case <-started:
default: // in case the receiver is not started yet
rErr = err
close(started)
return
}

r.logError(err)
}
}()
<-started
}

return nil
return rErr
}

// Start UDP receivers and the processing routines
Expand All @@ -269,9 +293,11 @@ func (r *UDPReceiver) Start(addr string, port int, decodeFunc DecoderFunc) error
}

if err := r.decoders(r.workers, decodeFunc); err != nil {
r.Stop()
return err
}
if err := r.receivers(r.sockets, addr, port); err != nil {
r.Stop()
return err
}
return nil
Expand Down

0 comments on commit f8e113a

Please sign in to comment.