Skip to content

Commit

Permalink
promtail: add support for RFC3164 syslogs
Browse files Browse the repository at this point in the history
This adds support for RFC3164 syslogs which are used by many devices, in
my use case OpenWrt and Ubiquiti routers. A new option called
`is_rfc3164_message` is added to the syslog which makes the incomming
logs to be handled as RFC3164.

Co-Authored-By: Paul Spooren <[email protected]>

Signed-off-by: Kirill A. Korinsky <[email protected]>
  • Loading branch information
catap committed Apr 28, 2024
1 parent cf88796 commit daf14b5
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 14 deletions.
3 changes: 3 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ type SyslogTargetConfig struct {
// message should be pushed to Loki
UseRFC5424Message bool `yaml:"use_rfc5424_message"`

// IsRFC3164Message defines wether the log is formated as RFC3164
IsRFC3164Message bool `yaml:"is_rfc3164_message"`

// MaxMessageLength sets the maximum limit to the length of syslog messages
MaxMessageLength int `yaml:"max_message_length"`

Expand Down
14 changes: 11 additions & 3 deletions clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// the callback function with the parsed messages. The parser automatically
// detects octet counting.
// The function returns on EOF or unrecoverable errors.
func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error {
func ParseStream(isRFC3164Message bool, r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error {
buf := bufio.NewReaderSize(r, 1<<10)

b, err := buf.ReadByte()
Expand All @@ -24,9 +24,17 @@ func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLengt
_ = buf.UnreadByte()

if b == '<' {
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
if isRFC3164Message {
nontransparent.NewParserRFC3164(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else {
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
}
} else if b >= '0' && b <= '9' {
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
if isRFC3164Message {
octetcounting.NewParserRFC3164(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else {
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
}
} else {
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", string(b))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestParseStream_OctetCounting(t *testing.T) {
results = append(results, res)
}

err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
require.NoError(t, err)

require.Equal(t, 2, len(results))
Expand All @@ -43,7 +43,7 @@ func TestParseStream_ValidParseError(t *testing.T) {
results = append(results, res)
}

err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
require.NoError(t, err)

require.Equal(t, 1, len(results))
Expand All @@ -59,7 +59,7 @@ func TestParseStream_OctetCounting_LongMessage(t *testing.T) {
results = append(results, res)
}

err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
require.NoError(t, err)

require.Equal(t, 1, len(results))
Expand All @@ -74,7 +74,7 @@ func TestParseStream_NewlineSeparated(t *testing.T) {
results = append(results, res)
}

err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
require.NoError(t, err)

require.Equal(t, 2, len(results))
Expand All @@ -87,13 +87,13 @@ func TestParseStream_NewlineSeparated(t *testing.T) {
func TestParseStream_InvalidStream(t *testing.T) {
r := strings.NewReader("invalid")

err := syslogparser.ParseStream(r, func(res *syslog.Result) {}, defaultMaxMessageLength)
err := syslogparser.ParseStream(false, r, func(res *syslog.Result) {}, defaultMaxMessageLength)
require.EqualError(t, err, "invalid or unsupported framing. first byte: 'i'")
}

func TestParseStream_EmptyStream(t *testing.T) {
r := strings.NewReader("")

err := syslogparser.ParseStream(r, func(res *syslog.Result) {}, defaultMaxMessageLength)
err := syslogparser.ParseStream(false, r, func(res *syslog.Result) {}, defaultMaxMessageLength)
require.Equal(t, err, io.EOF)
}
61 changes: 60 additions & 1 deletion clients/pkg/promtail/targets/syslog/syslogtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/influxdata/go-syslog/v3"
"github.com/influxdata/go-syslog/v3/rfc3164"
"github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (t *SyslogTarget) handleMessageError(err error) {
t.metrics.syslogParsingErrors.Inc()
}

func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) {
func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg syslog.Message) {
rfc5424Msg := msg.(*rfc5424.SyslogMessage)

if rfc5424Msg.Message == nil {
Expand Down Expand Up @@ -173,6 +174,64 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag
t.messages <- message{filtered, m, timestamp}
}

func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg syslog.Message) {
rfc3164Msg := msg.(*rfc3164.SyslogMessage)

if rfc3164Msg.Message == nil {
t.metrics.syslogEmptyMessages.Inc()
return
}

lb := labels.NewBuilder(connLabels)
if v := rfc3164Msg.SeverityLevel(); v != nil {
lb.Set("__syslog_message_severity", *v)
}
if v := rfc3164Msg.FacilityLevel(); v != nil {
lb.Set("__syslog_message_facility", *v)
}
if v := rfc3164Msg.Hostname; v != nil {
lb.Set("__syslog_message_hostname", *v)
}
if v := rfc3164Msg.Appname; v != nil {
lb.Set("__syslog_message_app_name", *v)
}
if v := rfc3164Msg.ProcID; v != nil {
lb.Set("__syslog_message_proc_id", *v)
}
if v := rfc3164Msg.MsgID; v != nil {
lb.Set("__syslog_message_msg_id", *v)
}

processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)

filtered := make(model.LabelSet)
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") {
continue
}
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

var timestamp time.Time
if t.config.UseIncomingTimestamp && rfc3164Msg.Timestamp != nil {
timestamp = *rfc3164Msg.Timestamp
} else {
timestamp = time.Now()
}

m := *rfc3164Msg.Message

t.messages <- message{filtered, m, timestamp}
}

func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) {
if t.config.IsRFC3164Message {
t.handleMessageRFC3164(connLabels, msg)
} else {
t.handleMessageRFC5424(connLabels, msg)
}
}

func (t *SyslogTarget) messageSender(entries chan<- api.Entry) {
for msg := range t.messages {
entries <- api.Entry{
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/syslog/syslogtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func TestParseStream_WithAsyncPipe(t *testing.T) {
results = append(results, res)
}

err := syslogparser.ParseStream(pipe, cb, defaultMaxMessageLength)
err := syslogparser.ParseStream(false, pipe, cb, defaultMaxMessageLength)
require.NoError(t, err)
require.Equal(t, 3, len(results))
}
7 changes: 4 additions & 3 deletions clients/pkg/promtail/targets/syslog/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (t *TCPTransport) handleConnection(cn net.Conn) {

lbs := t.connectionLabels(ipFromConn(c).String())

err := syslogparser.ParseStream(c, func(result *syslog.Result) {
err := syslogparser.ParseStream(t.config.IsRFC3164Message, c, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
return
Expand Down Expand Up @@ -380,7 +380,8 @@ func (t *UDPTransport) acceptPackets() {
func (t *UDPTransport) handleRcv(c *ConnPipe) {
defer t.openConnections.Done()

lbs := t.connectionLabels(c.addr.String())
udpAddr, _ := net.ResolveUDPAddr("udp", c.addr.String())
lbs := t.connectionLabels(udpAddr.IP.String())

for {
datagram := make([]byte, t.maxMessageLength())
Expand All @@ -396,7 +397,7 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) {

r := bytes.NewReader(datagram[:n])

err = syslogparser.ParseStream(r, func(result *syslog.Result) {
err = syslogparser.ParseStream(t.config.IsRFC3164Message, r, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
} else {
Expand Down

0 comments on commit daf14b5

Please sign in to comment.