Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Traps OIDs to names #10934

Merged
merged 9 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/logs/internal/launchers/traps/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/config"
tailer "github.com/DataDog/datadog-agent/pkg/logs/internal/tailers/traps"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/security/log"
"github.com/DataDog/datadog-agent/pkg/snmp/traps"
)

Expand Down Expand Up @@ -36,7 +37,16 @@ func (l *Launcher) Start() {

func (l *Launcher) startNewTailer(source *config.LogSource, inputChan chan *traps.SnmpPacket) {
outputChan := l.pipelineProvider.NextPipelineChan()
l.tailer = tailer.NewTailer(source, inputChan, outputChan)
oidResolver, err := traps.NewMultiFilesOIDResolver()
if err != nil {
log.Errorf("unable to load traps database: %w. Will not listen for SNMP traps", err)
return
}
l.tailer, err = tailer.NewTailer(oidResolver, source, inputChan, outputChan)
if err != nil {
log.Errorf("unable to load traps database: %w. Will not listen for SNMP traps", err)
return
}
l.tailer.Start()
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/logs/internal/tailers/traps/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package traps

import (
"encoding/json"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/config"
Expand All @@ -18,19 +17,25 @@ import (
// Tailer consumes and processes a stream of trap packets, and sends them to a stream of log messages.
type Tailer struct {
source *config.LogSource
formatter traps.Formatter
inputChan traps.PacketsChannel
outputChan chan *message.Message
done chan interface{}
}

// NewTailer returns a new Tailer
func NewTailer(source *config.LogSource, inputChan traps.PacketsChannel, outputChan chan *message.Message) *Tailer {
func NewTailer(oidResolver traps.OIDResolver, source *config.LogSource, inputChan traps.PacketsChannel, outputChan chan *message.Message) (*Tailer, error) {
formatter, err := traps.NewJSONFormatter(oidResolver)
if err != nil {
return nil, err
}
return &Tailer{
source: source,
inputChan: inputChan,
outputChan: outputChan,
formatter: formatter,
done: make(chan interface{}, 1),
}
}, nil
}

// Start starts the tailer.
Expand All @@ -50,19 +55,14 @@ func (t *Tailer) run() {

// Loop terminates when the channel is closed.
for packet := range t.inputChan {
data, err := traps.FormatPacketToJSON(packet)
data, err := t.formatter.FormatPacket(packet)
if err != nil {
log.Errorf("failed to format packet: %s", err)
continue
}
t.source.BytesRead.Add(int64(len(data)))
content, err := json.Marshal(data)
if err != nil {
log.Errorf("failed to serialize packet data to JSON: %s", err)
continue
}
origin := message.NewOrigin(t.source)
origin.SetTags(traps.GetTags(packet))
t.outputChan <- message.NewMessage(content, origin, message.StatusInfo, time.Now().UnixNano())
origin.SetTags(t.formatter.GetTags(packet))
t.outputChan <- message.NewMessage(data, origin, message.StatusInfo, time.Now().UnixNano())
}
}
36 changes: 28 additions & 8 deletions pkg/logs/internal/tailers/traps/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,39 @@
package traps

import (
"encoding/json"
"fmt"
"net"
"testing"
"time"

"github.com/gosnmp/gosnmp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/logs/config"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/snmp/traps"
)

// NoOpOIDResolver is a dummy OIDResolver implementation that is unable to get any Trap or Variable metadata.
type NoOpOIDResolver struct{}

// GetTrapMetadata always return an error in this OIDResolver implementation
func (or NoOpOIDResolver) GetTrapMetadata(trapOID string) (traps.TrapMetadata, error) {
return traps.TrapMetadata{}, fmt.Errorf("trap OID %s is not defined", trapOID)
}

// GetVariableMetadata always return an error in this OIDResolver implementation
func (or NoOpOIDResolver) GetVariableMetadata(trapOID string, varOID string) (traps.VariableMetadata, error) {
return traps.VariableMetadata{}, fmt.Errorf("trap OID %s is not defined", trapOID)
}

func TestTrapsShouldReceiveMessages(t *testing.T) {
inputChan := make(traps.PacketsChannel, 1)
outputChan := make(chan *message.Message)
tailer := NewTailer(config.NewLogSource("test", &config.LogsConfig{}), inputChan, outputChan)
tailer, err := NewTailer(&NoOpOIDResolver{}, config.NewLogSource("test", &config.LogsConfig{}), inputChan, outputChan)
require.NoError(t, err)
tailer.Start()

p := &traps.SnmpPacket{
Expand All @@ -46,18 +61,23 @@ func TestTrapsShouldReceiveMessages(t *testing.T) {
return
}

formattedPacket := format(t, p)
assert.Equal(t, message.StatusInfo, msg.GetStatus())
assert.Equal(t, format(t, p), msg.Content)
assert.Equal(t, traps.GetTags(p), msg.Origin.Tags())
assert.Equal(t, formattedPacket, msg.Content)
assert.Equal(t, []string{
"snmp_version:2",
"device_namespace:default",
"snmp_device:127.0.0.1",
}, msg.Origin.Tags())

close(inputChan)
tailer.WaitFlush()
}

func format(t *testing.T, p *traps.SnmpPacket) []byte {
data, err := traps.FormatPacketToJSON(p)
assert.NoError(t, err)
content, err := json.Marshal(data)
formatter, err := traps.NewJSONFormatter(NoOpOIDResolver{})
require.NoError(t, err)
formattedPacket, err := formatter.FormatPacket(p)
assert.NoError(t, err)
return content
return formattedPacket
}
4 changes: 2 additions & 2 deletions pkg/logs/schedulers/traps/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (s *Scheduler) Start(sourceMgr schedulers.SourceManager) {
// source to forward SNMP traps as logs.
source := logsConfig.NewLogSource(logsConfig.SnmpTraps, &logsConfig.LogsConfig{
Type: logsConfig.SnmpTrapsType,
Service: "snmp",
Source: "snmp",
Service: "snmp-traps",
Source: "snmp-traps",
})
log.Debug("Adding SNMPTraps source to the Logs Agent")
sourceMgr.AddSource(source)
Expand Down
182 changes: 0 additions & 182 deletions pkg/snmp/traps/format.go

This file was deleted.

Loading