Skip to content

Commit

Permalink
packetbeat/sniffer,packetbeat/protos/{tcp,udp}: prevent identical int…
Browse files Browse the repository at this point in the history
…erfaces from colliding in metric namespace (#36575)

Configurations from fleet may contain multiple identical interfaces, when the
sniffers for each of these are constructed, they will attempt to register under
the same ID in the metrics collection namespace. This results in a panic. So
give each interface an internally unique index to decollide them.
  • Loading branch information
efd6 authored Sep 19, 2023
1 parent 2b6ca78 commit 622de0c
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Packetbeat*

- Prevent panic when more than one interface is configured in fleet. {issue}36574[36574] {pull}36575[36575]

*Winlogbeat*

Expand Down
4 changes: 2 additions & 2 deletions packetbeat/protos/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type TCP struct {
}

// Creates and returns a new Tcp.
func NewTCP(p protos.Protocols, id, device string) (*TCP, error) {
func NewTCP(p protos.Protocols, id, device string, idx int) (*TCP, error) {
isDebug = logp.IsDebug("tcp")

portMap, err := buildPortsMap(p.GetAllTCP())
Expand All @@ -71,7 +71,7 @@ func NewTCP(p protos.Protocols, id, device string) (*TCP, error) {
tcp := &TCP{
protocols: p,
portMap: portMap,
metrics: newInputMetrics(id, device, portMap),
metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap),
}
tcp.streams = common.NewCacheWithRemovalListener(
protos.DefaultTransactionExpiration,
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/protos/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestTCSeqPayload(t *testing.T) {
parse: makeCollectPayload(&state, true),
},
},
}, "test", "test")
}, "test", "test", 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -343,7 +343,7 @@ func BenchmarkParallelProcess(b *testing.B) {
p := protocols{}
p.tcp = make(map[protos.Protocol]protos.TCPPlugin)
p.tcp[1] = &TestProtocol{Ports: []int{ServerPort}}
tcp, _ := NewTCP(p, "", "")
tcp, _ := NewTCP(p, "", "", 0)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/protos/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type UDP struct {
}

// NewUDP creates and returns a new UDP.
func NewUDP(p protos.Protocols, id, device string) (*UDP, error) {
func NewUDP(p protos.Protocols, id, device string, idx int) (*UDP, error) {
portMap, err := buildPortsMap(p.GetAllUDP())
if err != nil {
return nil, err
Expand All @@ -57,7 +57,7 @@ func NewUDP(p protos.Protocols, id, device string) (*UDP, error) {
udp := &UDP{
protocols: p,
portMap: portMap,
metrics: newInputMetrics(id, device, portMap),
metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap),
}
logp.Debug("udp", "Port map: %v", portMap)

Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func testSetup(t *testing.T) *TestStruct {
plugin := &TestProtocol{Ports: []int{PORT}}
protocols.udp[PROTO] = plugin

udp, err := NewUDP(protocols, "test", "test")
udp, err := NewUDP(protocols, "test", "test", 0)
if err != nil {
t.Error("Error creating UDP handler: ", err)
}
Expand Down
12 changes: 7 additions & 5 deletions packetbeat/sniffer/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import (

// Decoders functions return a Decoder able to process the provided network
// link type for use with a Sniffer. The cleanup closure should be called after
// the decoders are no longer needed to clean up resources.
type Decoders func(_ layers.LinkType, device string) (decoders *decoder.Decoder, cleanup func(), err error)
// the decoders are no longer needed to clean up resources. The idx parameter
// is the index into the list of devices obtained from the interfaces provided
// to New.
type Decoders func(_ layers.LinkType, device string, idx int) (decoders *decoder.Decoder, cleanup func(), err error)

// DecodersFor returns a source of Decoders using the provided configuration
// components. The id string is expected to be the ID of the beat.
func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher *procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) Decoders {
return func(dl layers.LinkType, device string) (*decoder.Decoder, func(), error) {
return func(dl layers.LinkType, device string, idx int) (*decoder.Decoder, func(), error) {
var icmp4 icmp.ICMPv4Processor
var icmp6 icmp.ICMPv6Processor
icmpCfg, err := cfg.ICMP()
Expand All @@ -61,12 +63,12 @@ func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols *
icmp6 = icmp
}

tcp, err := tcp.NewTCP(protocols, id, device)
tcp, err := tcp.NewTCP(protocols, id, device, idx)
if err != nil {
return nil, nil, err
}

udp, err := udp.NewUDP(protocols, id, device)
udp, err := udp.NewUDP(protocols, id, device, idx)
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions packetbeat/sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type sniffer struct {
// filter is the bpf filter program used by the sniffer.
filter string

// id identifies the sniffer for metric collection.
id string
// id and idx identify the sniffer for metric collection.
id string
idx int

decoders Decoders

Expand Down Expand Up @@ -100,6 +101,7 @@ func New(id string, testMode bool, _ string, decoders Decoders, interfaces []con
state: atomic.MakeInt32(snifferInactive),
followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"),
id: id,
idx: i,
decoders: decoders,
log: s.log,
}
Expand Down Expand Up @@ -287,7 +289,7 @@ func (s *sniffer) sniffStatic(ctx context.Context, device string) error {
}
defer handle.Close()

dec, cleanup, err := s.decoders(handle.LinkType(), device)
dec, cleanup, err := s.decoders(handle.LinkType(), device, s.idx)
if err != nil {
return err
}
Expand Down Expand Up @@ -330,7 +332,7 @@ func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layer
if dec == nil || linkType != last {
s.log.Infof("changing link type: %d -> %d", last, linkType)
var cleanup func()
dec, cleanup, err = s.decoders(linkType, device)
dec, cleanup, err = s.decoders(linkType, device, s.idx)
if err != nil {
return linkType, dec, err
}
Expand Down Expand Up @@ -464,7 +466,7 @@ func (s *sniffer) open(device string) (snifferHandle, error) {
case "pcap":
return openPcap(device, s.filter, &s.config)
case "af_packet":
return openAFPacket(s.id, device, s.filter, &s.config)
return openAFPacket(fmt.Sprintf("%s_%d", s.id, s.idx), device, s.filter, &s.config)
default:
return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type)
}
Expand Down

0 comments on commit 622de0c

Please sign in to comment.