diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c63a2234573d..56a4ff583fc6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Packetbeat* +- Prevent panic when more than one interface is configured in fleet. {issue}36574[36574] {pull}36575[36575] *Winlogbeat* diff --git a/packetbeat/protos/tcp/tcp.go b/packetbeat/protos/tcp/tcp.go index 874f7ddba8ae..57a3c64c4812 100644 --- a/packetbeat/protos/tcp/tcp.go +++ b/packetbeat/protos/tcp/tcp.go @@ -60,7 +60,7 @@ type TCP struct { } // Creates and returns a new Tcp. -func NewTCP(p protos.Protocols, id, device string) (*TCP, error) { +func NewTCP(p protos.Protocols, id, device string, idx int) (*TCP, error) { isDebug = logp.IsDebug("tcp") portMap, err := buildPortsMap(p.GetAllTCP()) @@ -71,7 +71,7 @@ func NewTCP(p protos.Protocols, id, device string) (*TCP, error) { tcp := &TCP{ protocols: p, portMap: portMap, - metrics: newInputMetrics(id, device, portMap), + metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap), } tcp.streams = common.NewCacheWithRemovalListener( protos.DefaultTransactionExpiration, diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 6cf782cb3ff4..3be35b758e5b 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -305,7 +305,7 @@ func TestTCSeqPayload(t *testing.T) { parse: makeCollectPayload(&state, true), }, }, - }, "test", "test") + }, "test", "test", 0) if err != nil { t.Fatal(err) } @@ -343,7 +343,7 @@ func BenchmarkParallelProcess(b *testing.B) { p := protocols{} p.tcp = make(map[protos.Protocol]protos.TCPPlugin) p.tcp[1] = &TestProtocol{Ports: []int{ServerPort}} - tcp, _ := NewTCP(p, "", "") + tcp, _ := NewTCP(p, "", "", 0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { diff --git a/packetbeat/protos/udp/udp.go b/packetbeat/protos/udp/udp.go index 86f56fbd8fb0..8ed9ffd1c35d 100644 --- a/packetbeat/protos/udp/udp.go +++ b/packetbeat/protos/udp/udp.go @@ -48,7 +48,7 @@ type UDP struct { } // NewUDP creates and returns a new UDP. -func NewUDP(p protos.Protocols, id, device string) (*UDP, error) { +func NewUDP(p protos.Protocols, id, device string, idx int) (*UDP, error) { portMap, err := buildPortsMap(p.GetAllUDP()) if err != nil { return nil, err @@ -57,7 +57,7 @@ func NewUDP(p protos.Protocols, id, device string) (*UDP, error) { udp := &UDP{ protocols: p, portMap: portMap, - metrics: newInputMetrics(id, device, portMap), + metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap), } logp.Debug("udp", "Port map: %v", portMap) diff --git a/packetbeat/protos/udp/udp_test.go b/packetbeat/protos/udp/udp_test.go index 6eed6aef2436..bc165444b912 100644 --- a/packetbeat/protos/udp/udp_test.go +++ b/packetbeat/protos/udp/udp_test.go @@ -110,7 +110,7 @@ func testSetup(t *testing.T) *TestStruct { plugin := &TestProtocol{Ports: []int{PORT}} protocols.udp[PROTO] = plugin - udp, err := NewUDP(protocols, "test", "test") + udp, err := NewUDP(protocols, "test", "test", 0) if err != nil { t.Error("Error creating UDP handler: ", err) } diff --git a/packetbeat/sniffer/decoders.go b/packetbeat/sniffer/decoders.go index e4d9c7f72d8b..992dd813c4c2 100644 --- a/packetbeat/sniffer/decoders.go +++ b/packetbeat/sniffer/decoders.go @@ -33,13 +33,15 @@ import ( // Decoders functions return a Decoder able to process the provided network // link type for use with a Sniffer. The cleanup closure should be called after -// the decoders are no longer needed to clean up resources. -type Decoders func(_ layers.LinkType, device string) (decoders *decoder.Decoder, cleanup func(), err error) +// the decoders are no longer needed to clean up resources. The idx parameter +// is the index into the list of devices obtained from the interfaces provided +// to New. +type Decoders func(_ layers.LinkType, device string, idx int) (decoders *decoder.Decoder, cleanup func(), err error) // DecodersFor returns a source of Decoders using the provided configuration // components. The id string is expected to be the ID of the beat. func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher *procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) Decoders { - return func(dl layers.LinkType, device string) (*decoder.Decoder, func(), error) { + return func(dl layers.LinkType, device string, idx int) (*decoder.Decoder, func(), error) { var icmp4 icmp.ICMPv4Processor var icmp6 icmp.ICMPv6Processor icmpCfg, err := cfg.ICMP() @@ -61,12 +63,12 @@ func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols * icmp6 = icmp } - tcp, err := tcp.NewTCP(protocols, id, device) + tcp, err := tcp.NewTCP(protocols, id, device, idx) if err != nil { return nil, nil, err } - udp, err := udp.NewUDP(protocols, id, device) + udp, err := udp.NewUDP(protocols, id, device, idx) if err != nil { return nil, nil, err } diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index 21402ef61d56..3cbff09483b2 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -64,8 +64,9 @@ type sniffer struct { // filter is the bpf filter program used by the sniffer. filter string - // id identifies the sniffer for metric collection. - id string + // id and idx identify the sniffer for metric collection. + id string + idx int decoders Decoders @@ -100,6 +101,7 @@ func New(id string, testMode bool, _ string, decoders Decoders, interfaces []con state: atomic.MakeInt32(snifferInactive), followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"), id: id, + idx: i, decoders: decoders, log: s.log, } @@ -287,7 +289,7 @@ func (s *sniffer) sniffStatic(ctx context.Context, device string) error { } defer handle.Close() - dec, cleanup, err := s.decoders(handle.LinkType(), device) + dec, cleanup, err := s.decoders(handle.LinkType(), device, s.idx) if err != nil { return err } @@ -330,7 +332,7 @@ func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layer if dec == nil || linkType != last { s.log.Infof("changing link type: %d -> %d", last, linkType) var cleanup func() - dec, cleanup, err = s.decoders(linkType, device) + dec, cleanup, err = s.decoders(linkType, device, s.idx) if err != nil { return linkType, dec, err } @@ -464,7 +466,7 @@ func (s *sniffer) open(device string) (snifferHandle, error) { case "pcap": return openPcap(device, s.filter, &s.config) case "af_packet": - return openAFPacket(s.id, device, s.filter, &s.config) + return openAFPacket(fmt.Sprintf("%s_%d", s.id, s.idx), device, s.filter, &s.config) default: return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type) }