Skip to content

Commit

Permalink
ensure syslog parser gets an EOF-terminated reader on udp receive (#5297
Browse files Browse the repository at this point in the history
)

Co-authored-by: Robert Fratto <[email protected]>
  • Loading branch information
joshuapare and rfratto authored Nov 2, 2023
1 parent 3683a95 commit 0a23b6d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ Main (unreleased)

- Mark `password` argument of `loki.source.kafka` as a `secret` rather than a `string`. (@harsiddhdave44)

- Fixed a bug where UDP syslog messages were never processed (@joshuapare)

### Enhancements

- The `loki.write` WAL now has snappy compression enabled by default. (@thepalbi)
Expand All @@ -122,7 +124,7 @@ Main (unreleased)

- Support clustering in `loki.source.podlogs` (@rfratto).

- Adds new metrics (`mssql_server_total_memory_bytes`, `mssql_server_target_memory_bytes`,
- Adds new metrics (`mssql_server_total_memory_bytes`, `mssql_server_target_memory_bytes`,
and `mssql_available_commit_memory_bytes`) for `mssql` integration.

v0.37.3 (2023-10-26)
Expand Down
33 changes: 25 additions & 8 deletions component/loki/source/syslog/internal/syslogtarget/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package syslogtarget
// to other loki components.

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
Expand Down Expand Up @@ -430,16 +431,32 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) {
defer t.openConnections.Done()

lbs := t.connectionLabels(c.addr.String())
err := syslogparser.ParseStream(c, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
} else {
t.handleMessage(lbs.Copy(), result.Message)

for {
datagram := make([]byte, t.maxMessageLength())
n, err := c.Read(datagram)
if err != nil {
if err == io.EOF {
break
}

level.Warn(t.logger).Log("msg", "error reading from pipe", "err", err)
continue
}
}, t.maxMessageLength())

if err != nil {
level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)
r := bytes.NewReader(datagram[:n])

err = syslogparser.ParseStream(r, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
} else {
t.handleMessage(lbs.Copy(), result.Message)
}
}, t.maxMessageLength())

if err != nil {
level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)
}
}
}

Expand Down

0 comments on commit 0a23b6d

Please sign in to comment.