Skip to content

Commit

Permalink
feat(inputs): fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Sep 20, 2024
1 parent 9011767 commit c41db77
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 47 deletions.
96 changes: 49 additions & 47 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type packetListener struct {
Log telegraf.Logger

conn net.PacketConn
decoder internal.ContentDecoder
decoders []internal.ContentDecoder
path string
wg sync.WaitGroup
parsePool *pond.WorkerPool
Expand Down Expand Up @@ -63,8 +63,10 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)

d := make([]byte, n)
copy(d, buf[:n])
decoderIdx := int(l.parsePool.SubmittedTasks()) % len(l.decoders)
decoder := l.decoders[decoderIdx]
l.parsePool.Submit(func() {
body, err := l.decoder.Decode(d)
body, err := decoder.Decode(d)
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
Expand Down Expand Up @@ -98,26 +100,32 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr
break
}

// Decode the contents depending on the given encoding
body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
d := make([]byte, n)
copy(d, buf[:n])
decoderIdx := int(l.parsePool.SubmittedTasks()) % len(l.decoders)
decoder := l.decoders[decoderIdx]
l.parsePool.Submit(func() {
// Decode the contents depending on the given encoding
body, err := decoder.Decode(d[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// blocks until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// blocks until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
})
}
}()
}
Expand Down Expand Up @@ -151,18 +159,9 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error {
}
}

// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder
err = l.setupDecoder()

return nil
return err
}

func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) error {
Expand Down Expand Up @@ -197,20 +196,11 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err
l.Log.Warnf("Setting read buffer on %s socket failed: %v", u.Scheme, err)
}
}
l.conn = conn

// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder
l.conn = conn
err = l.setupDecoder()

return nil
return err
}

func (l *packetListener) setupIP(u *url.URL) error {
Expand All @@ -219,17 +209,27 @@ func (l *packetListener) setupIP(u *url.URL) error {
return fmt.Errorf("listening (ip) failed: %w", err)
}
l.conn = conn
err = l.setupDecoder()

return err
}

func (l *packetListener) setupDecoder() error {
// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)

l.decoders = make([]internal.ContentDecoder, 0, l.parsePool.MaxWorkers())
for range l.parsePool.MaxWorkers() {
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}

l.decoders = append(l.decoders, decoder)
}
l.decoder = decoder

return nil
}
Expand All @@ -255,5 +255,7 @@ func (l *packetListener) close() error {
}
}

l.parsePool.StopAndWait()

return nil
}
3 changes: 3 additions & 0 deletions plugins/common/socket/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ func (l *streamListener) close() error {
return err
}
}

l.parsePool.StopAndWait()

return nil
}

Expand Down

0 comments on commit c41db77

Please sign in to comment.