From 0a23b6d18c2e98b71f78051f9d65ac0e650c681b Mon Sep 17 00:00:00 2001 From: Joshua Pare <65102852+joshuapare@users.noreply.github.com> Date: Thu, 2 Nov 2023 10:50:02 -0500 Subject: [PATCH] ensure syslog parser gets an EOF-terminated reader on udp receive (#5297) Co-authored-by: Robert Fratto --- CHANGELOG.md | 4 ++- .../syslog/internal/syslogtarget/transport.go | 33 ++++++++++++++----- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1bab405b455..6406022292bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,8 @@ Main (unreleased) - Mark `password` argument of `loki.source.kafka` as a `secret` rather than a `string`. (@harsiddhdave44) +- Fixed a bug where UDP syslog messages were never processed (@joshuapare) + ### Enhancements - The `loki.write` WAL now has snappy compression enabled by default. (@thepalbi) @@ -122,7 +124,7 @@ Main (unreleased) - Support clustering in `loki.source.podlogs` (@rfratto). -- Adds new metrics (`mssql_server_total_memory_bytes`, `mssql_server_target_memory_bytes`, +- Adds new metrics (`mssql_server_total_memory_bytes`, `mssql_server_target_memory_bytes`, and `mssql_available_commit_memory_bytes`) for `mssql` integration. v0.37.3 (2023-10-26) diff --git a/component/loki/source/syslog/internal/syslogtarget/transport.go b/component/loki/source/syslog/internal/syslogtarget/transport.go index cf12747a6b73..3348f27b15d3 100644 --- a/component/loki/source/syslog/internal/syslogtarget/transport.go +++ b/component/loki/source/syslog/internal/syslogtarget/transport.go @@ -5,6 +5,7 @@ package syslogtarget // to other loki components. import ( + "bytes" "context" "crypto/tls" "crypto/x509" @@ -430,16 +431,32 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) { defer t.openConnections.Done() lbs := t.connectionLabels(c.addr.String()) - err := syslogparser.ParseStream(c, func(result *syslog.Result) { - if err := result.Error; err != nil { - t.handleMessageError(err) - } else { - t.handleMessage(lbs.Copy(), result.Message) + + for { + datagram := make([]byte, t.maxMessageLength()) + n, err := c.Read(datagram) + if err != nil { + if err == io.EOF { + break + } + + level.Warn(t.logger).Log("msg", "error reading from pipe", "err", err) + continue } - }, t.maxMessageLength()) - if err != nil { - level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + r := bytes.NewReader(datagram[:n]) + + err = syslogparser.ParseStream(r, func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + } else { + t.handleMessage(lbs.Copy(), result.Message) + } + }, t.maxMessageLength()) + + if err != nil { + level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + } } }