Skip to content

Commit

Permalink
packetbeat: reduce levels of indirection, simplify and document (#32721)
Browse files Browse the repository at this point in the history
* packetbeat: simplify source layout and add some docs
* packetbeat/sniffer: remove unnecessary interface type
  • Loading branch information
efd6 authored Aug 23, 2022
1 parent 6c7ad2e commit 7a6a4a8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 34 deletions.
25 changes: 15 additions & 10 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,12 @@ type flags struct {
dumpfile *string
}

var cmdLineArgs flags

func init() {
cmdLineArgs = flags{
file: flag.String("I", "", "Read packet data from specified file"),
loop: flag.Int("l", 1, "Loop file. 0 - loop forever"),
oneAtAtime: flag.Bool("O", false, "Read packets one at a time (press Enter)"),
topSpeed: flag.Bool("t", false, "Read packets as fast as possible, without sleeping"),
dumpfile: flag.String("dump", "", "Write all captured packets to this libpcap file"),
}
var cmdLineArgs = flags{
file: flag.String("I", "", "Read packet data from specified file"),
loop: flag.Int("l", 1, "Loop file. 0 - loop forever"),
oneAtAtime: flag.Bool("O", false, "Read packets one at a time (press Enter)"),
topSpeed: flag.Bool("t", false, "Read packets as fast as possible, without sleeping"),
dumpfile: flag.String("dump", "", "Write all captured packets to this libpcap file"),
}

func initialConfig() config.Config {
Expand All @@ -84,6 +80,7 @@ type packetbeat struct {
done chan struct{}
}

// New returns a new Packetbeat beat.Beater.
func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
configurator := config.NewAgentConfig
if !b.Manager.Enabled() {
Expand All @@ -110,6 +107,10 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
}, nil
}

// Run starts the packetbeat network capture, decoding and event publication, sending
// events to b.Publisher. If b is mananaged, packetbeat is registered with the
// reload.Registry and handled by fleet. Otherwise it is run until cancelled or a
// fatal error.
func (pb *packetbeat) Run(b *beat.Beat) error {
defer func() {
if service.ProfileEnabled() {
Expand All @@ -125,6 +126,8 @@ func (pb *packetbeat) Run(b *beat.Beat) error {
return pb.runManaged(b, pb.factory)
}

// runStatic constructs a packetbeat runner and starts it, returning on cancellation
// or the first fatal error.
func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
runner, err := factory.Create(b.Publisher, pb.config)
if err != nil {
Expand All @@ -144,6 +147,8 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
return nil
}

// runManaged registers a packetbeat runner with the reload.Registry and starts
// the runner by starting the beat's manager. It returns on the first fatal error.
func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error {
runner := newReloader(management.DebugK, factory, b.Publisher)
reload.Register.MustRegisterList("inputs", runner)
Expand Down
6 changes: 3 additions & 3 deletions packetbeat/beater/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
if err != nil {
return nil, err
}
sniffer, err := setupSniffer(config, protocols, workerFactory(publisher, protocols, watcher, flows, config))
sniffer, err := setupSniffer(config, protocols, sniffer.DecodersFor(publisher, protocols, watcher, flows, config))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func setupFlows(pipeline beat.Pipeline, watcher procs.ProcessesWatcher, cfg conf
return flows.NewFlows(client.PublishAll, watcher, cfg.Flows)
}

func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, workerFactory sniffer.WorkerFactory) (*sniffer.Sniffer, error) {
func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, decoders sniffer.Decoders) (*sniffer.Sniffer, error) {
icmp, err := cfg.ICMP()
if err != nil {
return nil, err
Expand All @@ -200,7 +200,7 @@ func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, workerFa
filter = protocols.BpfFilter(cfg.Interfaces.WithVlans, icmp.Enabled())
}

return sniffer.New(false, filter, workerFactory, cfg.Interfaces)
return sniffer.New(false, filter, decoders, cfg.Interfaces)
}

// CheckConfig performs a dry-run creation of a Packetbeat pipeline based
Expand Down
13 changes: 9 additions & 4 deletions packetbeat/beater/worker.go → packetbeat/sniffer/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package beater
package sniffer

import (
"github.com/google/gopacket/layers"
Expand All @@ -29,11 +29,16 @@ import (
"github.com/elastic/beats/v7/packetbeat/protos/tcp"
"github.com/elastic/beats/v7/packetbeat/protos/udp"
"github.com/elastic/beats/v7/packetbeat/publish"
"github.com/elastic/beats/v7/packetbeat/sniffer"
)

func workerFactory(publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) func(dl layers.LinkType) (sniffer.Worker, error) {
return func(dl layers.LinkType) (sniffer.Worker, error) {
// Decoders functions return a Decoder able to process the provided network
// link type for use with a Sniffer.
type Decoders func(layers.LinkType) (*decoder.Decoder, error)

// DecodersFor returns a source of Decoders using the provided configuration
// components.
func DecodersFor(publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) Decoders {
return func(dl layers.LinkType) (*decoder.Decoder, error) {
var icmp4 icmp.ICMPv4Processor
var icmp6 icmp.ICMPv6Processor
config, err := cfg.ICMP()
Expand Down
25 changes: 8 additions & 17 deletions packetbeat/sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,7 @@ type Sniffer struct {
// filter is the bpf filter program used by the sniffer.
filter string

factory WorkerFactory
}

// WorkerFactory constructs a new worker instance for use with a Sniffer.
type WorkerFactory func(layers.LinkType) (Worker, error)

// Worker defines the callback interfaces a Sniffer instance will use
// to forward packets.
type Worker interface {
OnPacket(data []byte, ci *gopacket.CaptureInfo)
decoders Decoders
}

type snifferHandle interface {
Expand All @@ -73,12 +64,12 @@ const (
// New create a new Sniffer instance. Settings are validated in a best effort
// only, but no device is opened yet. Accessing and configuring the actual device
// is done by the Run method.
func New(testMode bool, filter string, factory WorkerFactory, interfaces config.InterfacesConfig) (*Sniffer, error) {
func New(testMode bool, filter string, decoders Decoders, interfaces config.InterfacesConfig) (*Sniffer, error) {
s := &Sniffer{
filter: filter,
config: interfaces,
factory: factory,
state: atomic.MakeInt32(snifferInactive),
filter: filter,
config: interfaces,
decoders: decoders,
state: atomic.MakeInt32(snifferInactive),
}

logp.Debug("sniffer", "BPF filter: '%s'", filter)
Expand Down Expand Up @@ -155,7 +146,7 @@ func (s *Sniffer) Run() error {
}
}

worker, err := s.factory(handle.LinkType())
decoder, err := s.decoders(handle.LinkType())
if err != nil {
return err
}
Expand Down Expand Up @@ -206,7 +197,7 @@ func (s *Sniffer) Run() error {
}

logp.Debug("sniffer", "Packet number: %d", packets)
worker.OnPacket(data, &ci)
decoder.OnPacket(data, &ci)
}

return nil
Expand Down

0 comments on commit 7a6a4a8

Please sign in to comment.