diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index a7c6899aae95..a0dde1e28a68 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -53,16 +53,12 @@ type flags struct { dumpfile *string } -var cmdLineArgs flags - -func init() { - cmdLineArgs = flags{ - file: flag.String("I", "", "Read packet data from specified file"), - loop: flag.Int("l", 1, "Loop file. 0 - loop forever"), - oneAtAtime: flag.Bool("O", false, "Read packets one at a time (press Enter)"), - topSpeed: flag.Bool("t", false, "Read packets as fast as possible, without sleeping"), - dumpfile: flag.String("dump", "", "Write all captured packets to this libpcap file"), - } +var cmdLineArgs = flags{ + file: flag.String("I", "", "Read packet data from specified file"), + loop: flag.Int("l", 1, "Loop file. 0 - loop forever"), + oneAtAtime: flag.Bool("O", false, "Read packets one at a time (press Enter)"), + topSpeed: flag.Bool("t", false, "Read packets as fast as possible, without sleeping"), + dumpfile: flag.String("dump", "", "Write all captured packets to this libpcap file"), } func initialConfig() config.Config { @@ -84,6 +80,7 @@ type packetbeat struct { done chan struct{} } +// New returns a new Packetbeat beat.Beater. func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { configurator := config.NewAgentConfig if !b.Manager.Enabled() { @@ -110,6 +107,10 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { }, nil } +// Run starts the packetbeat network capture, decoding and event publication, sending +// events to b.Publisher. If b is mananaged, packetbeat is registered with the +// reload.Registry and handled by fleet. Otherwise it is run until cancelled or a +// fatal error. func (pb *packetbeat) Run(b *beat.Beat) error { defer func() { if service.ProfileEnabled() { @@ -125,6 +126,8 @@ func (pb *packetbeat) Run(b *beat.Beat) error { return pb.runManaged(b, pb.factory) } +// runStatic constructs a packetbeat runner and starts it, returning on cancellation +// or the first fatal error. func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { runner, err := factory.Create(b.Publisher, pb.config) if err != nil { @@ -144,6 +147,8 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { return nil } +// runManaged registers a packetbeat runner with the reload.Registry and starts +// the runner by starting the beat's manager. It returns on the first fatal error. func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { runner := newReloader(management.DebugK, factory, b.Publisher) reload.Register.MustRegisterList("inputs", runner) diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go index e63588fd94ac..21adac2eac61 100644 --- a/packetbeat/beater/processor.go +++ b/packetbeat/beater/processor.go @@ -150,7 +150,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C) if err != nil { return nil, err } - sniffer, err := setupSniffer(config, protocols, workerFactory(publisher, protocols, watcher, flows, config)) + sniffer, err := setupSniffer(config, protocols, sniffer.DecodersFor(publisher, protocols, watcher, flows, config)) if err != nil { return nil, err } @@ -189,7 +189,7 @@ func setupFlows(pipeline beat.Pipeline, watcher procs.ProcessesWatcher, cfg conf return flows.NewFlows(client.PublishAll, watcher, cfg.Flows) } -func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, workerFactory sniffer.WorkerFactory) (*sniffer.Sniffer, error) { +func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, decoders sniffer.Decoders) (*sniffer.Sniffer, error) { icmp, err := cfg.ICMP() if err != nil { return nil, err @@ -200,7 +200,7 @@ func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, workerFa filter = protocols.BpfFilter(cfg.Interfaces.WithVlans, icmp.Enabled()) } - return sniffer.New(false, filter, workerFactory, cfg.Interfaces) + return sniffer.New(false, filter, decoders, cfg.Interfaces) } // CheckConfig performs a dry-run creation of a Packetbeat pipeline based diff --git a/packetbeat/beater/worker.go b/packetbeat/sniffer/decoders.go similarity index 79% rename from packetbeat/beater/worker.go rename to packetbeat/sniffer/decoders.go index 2c7f1d7eff60..cb99ae4bfbc8 100644 --- a/packetbeat/beater/worker.go +++ b/packetbeat/sniffer/decoders.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beater +package sniffer import ( "github.com/google/gopacket/layers" @@ -29,11 +29,16 @@ import ( "github.com/elastic/beats/v7/packetbeat/protos/tcp" "github.com/elastic/beats/v7/packetbeat/protos/udp" "github.com/elastic/beats/v7/packetbeat/publish" - "github.com/elastic/beats/v7/packetbeat/sniffer" ) -func workerFactory(publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) func(dl layers.LinkType) (sniffer.Worker, error) { - return func(dl layers.LinkType) (sniffer.Worker, error) { +// Decoders functions return a Decoder able to process the provided network +// link type for use with a Sniffer. +type Decoders func(layers.LinkType) (*decoder.Decoder, error) + +// DecodersFor returns a source of Decoders using the provided configuration +// components. +func DecodersFor(publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) Decoders { + return func(dl layers.LinkType) (*decoder.Decoder, error) { var icmp4 icmp.ICMPv4Processor var icmp6 icmp.ICMPv6Processor config, err := cfg.ICMP() diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index c1c770b714c8..d1c4e29b0189 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -45,16 +45,7 @@ type Sniffer struct { // filter is the bpf filter program used by the sniffer. filter string - factory WorkerFactory -} - -// WorkerFactory constructs a new worker instance for use with a Sniffer. -type WorkerFactory func(layers.LinkType) (Worker, error) - -// Worker defines the callback interfaces a Sniffer instance will use -// to forward packets. -type Worker interface { - OnPacket(data []byte, ci *gopacket.CaptureInfo) + decoders Decoders } type snifferHandle interface { @@ -73,12 +64,12 @@ const ( // New create a new Sniffer instance. Settings are validated in a best effort // only, but no device is opened yet. Accessing and configuring the actual device // is done by the Run method. -func New(testMode bool, filter string, factory WorkerFactory, interfaces config.InterfacesConfig) (*Sniffer, error) { +func New(testMode bool, filter string, decoders Decoders, interfaces config.InterfacesConfig) (*Sniffer, error) { s := &Sniffer{ - filter: filter, - config: interfaces, - factory: factory, - state: atomic.MakeInt32(snifferInactive), + filter: filter, + config: interfaces, + decoders: decoders, + state: atomic.MakeInt32(snifferInactive), } logp.Debug("sniffer", "BPF filter: '%s'", filter) @@ -155,7 +146,7 @@ func (s *Sniffer) Run() error { } } - worker, err := s.factory(handle.LinkType()) + decoder, err := s.decoders(handle.LinkType()) if err != nil { return err } @@ -206,7 +197,7 @@ func (s *Sniffer) Run() error { } logp.Debug("sniffer", "Packet number: %d", packets) - worker.OnPacket(data, &ci) + decoder.OnPacket(data, &ci) } return nil