Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

packetbeat/sniffer,packetbeat/protos/{tcp,udp}: prevent identical interfaces from colliding in metric namespace #36575

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Packetbeat*

- Prevent panic when more than one interface is configured in fleet. {issue}36574[36574] {pull}36575[36575]

*Winlogbeat*

Expand Down
4 changes: 2 additions & 2 deletions packetbeat/protos/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type TCP struct {
}

// Creates and returns a new Tcp.
func NewTCP(p protos.Protocols, id, device string) (*TCP, error) {
func NewTCP(p protos.Protocols, id, device string, idx int) (*TCP, error) {
isDebug = logp.IsDebug("tcp")

portMap, err := buildPortsMap(p.GetAllTCP())
Expand All @@ -71,7 +71,7 @@ func NewTCP(p protos.Protocols, id, device string) (*TCP, error) {
tcp := &TCP{
protocols: p,
portMap: portMap,
metrics: newInputMetrics(id, device, portMap),
metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap),
}
tcp.streams = common.NewCacheWithRemovalListener(
protos.DefaultTransactionExpiration,
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/protos/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestTCSeqPayload(t *testing.T) {
parse: makeCollectPayload(&state, true),
},
},
}, "test", "test")
}, "test", "test", 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -343,7 +343,7 @@ func BenchmarkParallelProcess(b *testing.B) {
p := protocols{}
p.tcp = make(map[protos.Protocol]protos.TCPPlugin)
p.tcp[1] = &TestProtocol{Ports: []int{ServerPort}}
tcp, _ := NewTCP(p, "", "")
tcp, _ := NewTCP(p, "", "", 0)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/protos/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type UDP struct {
}

// NewUDP creates and returns a new UDP.
func NewUDP(p protos.Protocols, id, device string) (*UDP, error) {
func NewUDP(p protos.Protocols, id, device string, idx int) (*UDP, error) {
portMap, err := buildPortsMap(p.GetAllUDP())
if err != nil {
return nil, err
Expand All @@ -57,7 +57,7 @@ func NewUDP(p protos.Protocols, id, device string) (*UDP, error) {
udp := &UDP{
protocols: p,
portMap: portMap,
metrics: newInputMetrics(id, device, portMap),
metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap),
}
logp.Debug("udp", "Port map: %v", portMap)

Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func testSetup(t *testing.T) *TestStruct {
plugin := &TestProtocol{Ports: []int{PORT}}
protocols.udp[PROTO] = plugin

udp, err := NewUDP(protocols, "test", "test")
udp, err := NewUDP(protocols, "test", "test", 0)
if err != nil {
t.Error("Error creating UDP handler: ", err)
}
Expand Down
12 changes: 7 additions & 5 deletions packetbeat/sniffer/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import (

// Decoders functions return a Decoder able to process the provided network
// link type for use with a Sniffer. The cleanup closure should be called after
// the decoders are no longer needed to clean up resources.
type Decoders func(_ layers.LinkType, device string) (decoders *decoder.Decoder, cleanup func(), err error)
// the decoders are no longer needed to clean up resources. The idx parameter
// is the index into the list of devices obtained from the interfaces provided
// to New.
type Decoders func(_ layers.LinkType, device string, idx int) (decoders *decoder.Decoder, cleanup func(), err error)

// DecodersFor returns a source of Decoders using the provided configuration
// components. The id string is expected to be the ID of the beat.
func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher *procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) Decoders {
return func(dl layers.LinkType, device string) (*decoder.Decoder, func(), error) {
return func(dl layers.LinkType, device string, idx int) (*decoder.Decoder, func(), error) {
var icmp4 icmp.ICMPv4Processor
var icmp6 icmp.ICMPv6Processor
icmpCfg, err := cfg.ICMP()
Expand All @@ -61,12 +63,12 @@ func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols *
icmp6 = icmp
}

tcp, err := tcp.NewTCP(protocols, id, device)
tcp, err := tcp.NewTCP(protocols, id, device, idx)
if err != nil {
return nil, nil, err
}

udp, err := udp.NewUDP(protocols, id, device)
udp, err := udp.NewUDP(protocols, id, device, idx)
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions packetbeat/sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type sniffer struct {
// filter is the bpf filter program used by the sniffer.
filter string

// id identifies the sniffer for metric collection.
id string
// id and idx identify the sniffer for metric collection.
id string
idx int

decoders Decoders

Expand Down Expand Up @@ -100,6 +101,7 @@ func New(id string, testMode bool, _ string, decoders Decoders, interfaces []con
state: atomic.MakeInt32(snifferInactive),
followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"),
id: id,
idx: i,
decoders: decoders,
log: s.log,
}
Expand Down Expand Up @@ -287,7 +289,7 @@ func (s *sniffer) sniffStatic(ctx context.Context, device string) error {
}
defer handle.Close()

dec, cleanup, err := s.decoders(handle.LinkType(), device)
dec, cleanup, err := s.decoders(handle.LinkType(), device, s.idx)
if err != nil {
return err
}
Expand Down Expand Up @@ -330,7 +332,7 @@ func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layer
if dec == nil || linkType != last {
s.log.Infof("changing link type: %d -> %d", last, linkType)
var cleanup func()
dec, cleanup, err = s.decoders(linkType, device)
dec, cleanup, err = s.decoders(linkType, device, s.idx)
if err != nil {
return linkType, dec, err
}
Expand Down Expand Up @@ -464,7 +466,7 @@ func (s *sniffer) open(device string) (snifferHandle, error) {
case "pcap":
return openPcap(device, s.filter, &s.config)
case "af_packet":
return openAFPacket(s.id, device, s.filter, &s.config)
return openAFPacket(fmt.Sprintf("%s_%d", s.id, s.idx), device, s.filter, &s.config)
default:
return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type)
}
Expand Down
Loading