From 622de0cadaa3083e96434e8b8db0c95191a34f62 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Wed, 20 Sep 2023 08:11:37 +0930 Subject: [PATCH] packetbeat/sniffer,packetbeat/protos/{tcp,udp}: prevent identical interfaces from colliding in metric namespace (#36575) Configurations from fleet may contain multiple identical interfaces, when the sniffers for each of these are constructed, they will attempt to register under the same ID in the metrics collection namespace. This results in a panic. So give each interface an internally unique index to decollide them. --- CHANGELOG.next.asciidoc | 1 + packetbeat/protos/tcp/tcp.go | 4 ++-- packetbeat/protos/tcp/tcp_test.go | 4 ++-- packetbeat/protos/udp/udp.go | 4 ++-- packetbeat/protos/udp/udp_test.go | 2 +- packetbeat/sniffer/decoders.go | 12 +++++++----- packetbeat/sniffer/sniffer.go | 12 +++++++----- 7 files changed, 22 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c63a2234573..56a4ff583fc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Packetbeat* +- Prevent panic when more than one interface is configured in fleet. {issue}36574[36574] {pull}36575[36575] *Winlogbeat* diff --git a/packetbeat/protos/tcp/tcp.go b/packetbeat/protos/tcp/tcp.go index 874f7ddba8a..57a3c64c481 100644 --- a/packetbeat/protos/tcp/tcp.go +++ b/packetbeat/protos/tcp/tcp.go @@ -60,7 +60,7 @@ type TCP struct { } // Creates and returns a new Tcp. -func NewTCP(p protos.Protocols, id, device string) (*TCP, error) { +func NewTCP(p protos.Protocols, id, device string, idx int) (*TCP, error) { isDebug = logp.IsDebug("tcp") portMap, err := buildPortsMap(p.GetAllTCP()) @@ -71,7 +71,7 @@ func NewTCP(p protos.Protocols, id, device string) (*TCP, error) { tcp := &TCP{ protocols: p, portMap: portMap, - metrics: newInputMetrics(id, device, portMap), + metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap), } tcp.streams = common.NewCacheWithRemovalListener( protos.DefaultTransactionExpiration, diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 6cf782cb3ff..3be35b758e5 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -305,7 +305,7 @@ func TestTCSeqPayload(t *testing.T) { parse: makeCollectPayload(&state, true), }, }, - }, "test", "test") + }, "test", "test", 0) if err != nil { t.Fatal(err) } @@ -343,7 +343,7 @@ func BenchmarkParallelProcess(b *testing.B) { p := protocols{} p.tcp = make(map[protos.Protocol]protos.TCPPlugin) p.tcp[1] = &TestProtocol{Ports: []int{ServerPort}} - tcp, _ := NewTCP(p, "", "") + tcp, _ := NewTCP(p, "", "", 0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { diff --git a/packetbeat/protos/udp/udp.go b/packetbeat/protos/udp/udp.go index 86f56fbd8fb..8ed9ffd1c35 100644 --- a/packetbeat/protos/udp/udp.go +++ b/packetbeat/protos/udp/udp.go @@ -48,7 +48,7 @@ type UDP struct { } // NewUDP creates and returns a new UDP. -func NewUDP(p protos.Protocols, id, device string) (*UDP, error) { +func NewUDP(p protos.Protocols, id, device string, idx int) (*UDP, error) { portMap, err := buildPortsMap(p.GetAllUDP()) if err != nil { return nil, err @@ -57,7 +57,7 @@ func NewUDP(p protos.Protocols, id, device string) (*UDP, error) { udp := &UDP{ protocols: p, portMap: portMap, - metrics: newInputMetrics(id, device, portMap), + metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap), } logp.Debug("udp", "Port map: %v", portMap) diff --git a/packetbeat/protos/udp/udp_test.go b/packetbeat/protos/udp/udp_test.go index 6eed6aef243..bc165444b91 100644 --- a/packetbeat/protos/udp/udp_test.go +++ b/packetbeat/protos/udp/udp_test.go @@ -110,7 +110,7 @@ func testSetup(t *testing.T) *TestStruct { plugin := &TestProtocol{Ports: []int{PORT}} protocols.udp[PROTO] = plugin - udp, err := NewUDP(protocols, "test", "test") + udp, err := NewUDP(protocols, "test", "test", 0) if err != nil { t.Error("Error creating UDP handler: ", err) } diff --git a/packetbeat/sniffer/decoders.go b/packetbeat/sniffer/decoders.go index e4d9c7f72d8..992dd813c4c 100644 --- a/packetbeat/sniffer/decoders.go +++ b/packetbeat/sniffer/decoders.go @@ -33,13 +33,15 @@ import ( // Decoders functions return a Decoder able to process the provided network // link type for use with a Sniffer. The cleanup closure should be called after -// the decoders are no longer needed to clean up resources. -type Decoders func(_ layers.LinkType, device string) (decoders *decoder.Decoder, cleanup func(), err error) +// the decoders are no longer needed to clean up resources. The idx parameter +// is the index into the list of devices obtained from the interfaces provided +// to New. +type Decoders func(_ layers.LinkType, device string, idx int) (decoders *decoder.Decoder, cleanup func(), err error) // DecodersFor returns a source of Decoders using the provided configuration // components. The id string is expected to be the ID of the beat. func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher *procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) Decoders { - return func(dl layers.LinkType, device string) (*decoder.Decoder, func(), error) { + return func(dl layers.LinkType, device string, idx int) (*decoder.Decoder, func(), error) { var icmp4 icmp.ICMPv4Processor var icmp6 icmp.ICMPv6Processor icmpCfg, err := cfg.ICMP() @@ -61,12 +63,12 @@ func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols * icmp6 = icmp } - tcp, err := tcp.NewTCP(protocols, id, device) + tcp, err := tcp.NewTCP(protocols, id, device, idx) if err != nil { return nil, nil, err } - udp, err := udp.NewUDP(protocols, id, device) + udp, err := udp.NewUDP(protocols, id, device, idx) if err != nil { return nil, nil, err } diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index 21402ef61d5..3cbff09483b 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -64,8 +64,9 @@ type sniffer struct { // filter is the bpf filter program used by the sniffer. filter string - // id identifies the sniffer for metric collection. - id string + // id and idx identify the sniffer for metric collection. + id string + idx int decoders Decoders @@ -100,6 +101,7 @@ func New(id string, testMode bool, _ string, decoders Decoders, interfaces []con state: atomic.MakeInt32(snifferInactive), followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"), id: id, + idx: i, decoders: decoders, log: s.log, } @@ -287,7 +289,7 @@ func (s *sniffer) sniffStatic(ctx context.Context, device string) error { } defer handle.Close() - dec, cleanup, err := s.decoders(handle.LinkType(), device) + dec, cleanup, err := s.decoders(handle.LinkType(), device, s.idx) if err != nil { return err } @@ -330,7 +332,7 @@ func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layer if dec == nil || linkType != last { s.log.Infof("changing link type: %d -> %d", last, linkType) var cleanup func() - dec, cleanup, err = s.decoders(linkType, device) + dec, cleanup, err = s.decoders(linkType, device, s.idx) if err != nil { return linkType, dec, err } @@ -464,7 +466,7 @@ func (s *sniffer) open(device string) (snifferHandle, error) { case "pcap": return openPcap(device, s.filter, &s.config) case "af_packet": - return openAFPacket(s.id, device, s.filter, &s.config) + return openAFPacket(fmt.Sprintf("%s_%d", s.id, s.idx), device, s.filter, &s.config) default: return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type) }